LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 864 1206 71.6 %
Date: 2025-04-01 15:15:16 Functions: 20 23 87.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/select.h>
      19             : #include <sys/time.h>
      20             : 
      21             : #include "catalog/pg_type_d.h"
      22             : #include "libpq-fe.h"
      23             : #include "pg_getopt.h"
      24             : 
      25             : 
      26             : static void exit_nicely(PGconn *conn);
      27             : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
      28             :             pg_attribute_printf(2, 3);
      29             : static bool process_result(PGconn *conn, PGresult *res, int results,
      30             :                            int numsent);
      31             : 
      32             : static const char *const progname = "libpq_pipeline";
      33             : 
      34             : /* Options and defaults */
      35             : static char *tracefile = NULL;  /* path to PQtrace() file */
      36             : 
      37             : 
      38             : #ifdef DEBUG_OUTPUT
      39             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      40             : #else
      41             : #define pg_debug(...)
      42             : #endif
      43             : 
      44             : static const char *const drop_table_sql =
      45             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      46             : static const char *const create_table_sql =
      47             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      48             : "int8filler int8);";
      49             : static const char *const insert_sql =
      50             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      51             : static const char *const insert_sql2 =
      52             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      53             : 
      54             : /* max char length of an int32/64, plus sign and null terminator */
      55             : #define MAXINTLEN 12
      56             : #define MAXINT8LEN 20
      57             : 
      58             : static void
      59           0 : exit_nicely(PGconn *conn)
      60             : {
      61           0 :     PQfinish(conn);
      62           0 :     exit(1);
      63             : }
      64             : 
      65             : /*
      66             :  * The following few functions are wrapped in macros to make the reported line
      67             :  * number in an error match the line number of the invocation.
      68             :  */
      69             : 
      70             : /*
      71             :  * Print an error to stderr and terminate the program.
      72             :  */
      73             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      74             : pg_noreturn static void
      75           0 : pg_fatal_impl(int line, const char *fmt,...)
      76             : {
      77             :     va_list     args;
      78             : 
      79           0 :     fflush(stdout);
      80             : 
      81           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      82           0 :     va_start(args, fmt);
      83           0 :     vfprintf(stderr, fmt, args);
      84           0 :     va_end(args);
      85             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      86           0 :     fprintf(stderr, "\n");
      87           0 :     exit(1);
      88             : }
      89             : 
      90             : /*
      91             :  * Check that the query on the given connection got canceled.
      92             :  */
      93             : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
      94             : static void
      95          12 : confirm_query_canceled_impl(int line, PGconn *conn)
      96             : {
      97          12 :     PGresult   *res = NULL;
      98             : 
      99          12 :     res = PQgetResult(conn);
     100          12 :     if (res == NULL)
     101           0 :         pg_fatal_impl(line, "PQgetResult returned null: %s",
     102             :                       PQerrorMessage(conn));
     103          12 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     104           0 :         pg_fatal_impl(line, "query did not fail when it was expected");
     105          12 :     if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
     106           0 :         pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
     107             :                       PQerrorMessage(conn));
     108          12 :     PQclear(res);
     109             : 
     110          12 :     while (PQisBusy(conn))
     111           0 :         PQconsumeInput(conn);
     112          12 : }
     113             : 
     114             : /*
     115             :  * Using monitorConn, query pg_stat_activity to see that the connection with
     116             :  * the given PID is either in the given state, or waiting on the given event
     117             :  * (only one of them can be given).
     118             :  */
     119             : static void
     120          24 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
     121             :                           char *state, char *event)
     122             : {
     123          24 :     const Oid   paramTypes[] = {INT4OID, TEXTOID};
     124             :     const char *paramValues[2];
     125          24 :     char       *pidstr = psprintf("%d", procpid);
     126             : 
     127             :     Assert((state == NULL) ^ (event == NULL));
     128             : 
     129          24 :     paramValues[0] = pidstr;
     130          24 :     paramValues[1] = state ? state : event;
     131             : 
     132             :     while (true)
     133           0 :     {
     134             :         PGresult   *res;
     135             :         char       *value;
     136             : 
     137          24 :         if (state != NULL)
     138          12 :             res = PQexecParams(monitorConn,
     139             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     140             :                                "pid = $1 AND state = $2",
     141             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     142             :         else
     143          12 :             res = PQexecParams(monitorConn,
     144             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     145             :                                "pid = $1 AND wait_event = $2",
     146             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     147             : 
     148          24 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     149           0 :             pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
     150          24 :         if (PQntuples(res) != 1)
     151           0 :             pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
     152          24 :         if (PQnfields(res) != 1)
     153           0 :             pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
     154          24 :         value = PQgetvalue(res, 0, 0);
     155          24 :         if (strcmp(value, "0") != 0)
     156             :         {
     157          24 :             PQclear(res);
     158          24 :             break;
     159             :         }
     160           0 :         PQclear(res);
     161             : 
     162             :         /* wait 10ms before polling again */
     163           0 :         pg_usleep(10000);
     164             :     }
     165             : 
     166          24 :     pfree(pidstr);
     167          24 : }
     168             : 
     169             : #define send_cancellable_query(conn, monitorConn) \
     170             :     send_cancellable_query_impl(__LINE__, conn, monitorConn)
     171             : static void
     172          12 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
     173             : {
     174             :     const char *env_wait;
     175          12 :     const Oid   paramTypes[1] = {INT4OID};
     176             : 
     177             :     /*
     178             :      * Wait for the connection to be idle, so that our check for an active
     179             :      * connection below is reliable, instead of possibly seeing an outdated
     180             :      * state.
     181             :      */
     182          12 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
     183             : 
     184          12 :     env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
     185          12 :     if (env_wait == NULL)
     186          12 :         env_wait = "180";
     187             : 
     188          12 :     if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
     189             :                           &env_wait, NULL, NULL, 0) != 1)
     190           0 :         pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
     191             : 
     192             :     /*
     193             :      * Wait for the sleep to be active, because if the query is not running
     194             :      * yet, the cancel request that we send won't have any effect.
     195             :      */
     196          12 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
     197          12 : }
     198             : 
     199             : /*
     200             :  * Create a new connection with the same conninfo as the given one.
     201             :  */
     202             : static PGconn *
     203           2 : copy_connection(PGconn *conn)
     204             : {
     205             :     PGconn     *copyConn;
     206           2 :     PQconninfoOption *opts = PQconninfo(conn);
     207             :     const char **keywords;
     208             :     const char **vals;
     209           2 :     int         nopts = 1;
     210           2 :     int         i = 0;
     211             : 
     212          96 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     213          94 :         nopts++;
     214             : 
     215           2 :     keywords = pg_malloc(sizeof(char *) * nopts);
     216           2 :     vals = pg_malloc(sizeof(char *) * nopts);
     217             : 
     218          96 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     219             :     {
     220          94 :         if (opt->val)
     221             :         {
     222          38 :             keywords[i] = opt->keyword;
     223          38 :             vals[i] = opt->val;
     224          38 :             i++;
     225             :         }
     226             :     }
     227           2 :     keywords[i] = vals[i] = NULL;
     228             : 
     229           2 :     copyConn = PQconnectdbParams(keywords, vals, false);
     230             : 
     231           2 :     if (PQstatus(copyConn) != CONNECTION_OK)
     232           0 :         pg_fatal("Connection to database failed: %s",
     233             :                  PQerrorMessage(copyConn));
     234             : 
     235           2 :     return copyConn;
     236             : }
     237             : 
     238             : /*
     239             :  * Test query cancellation routines
     240             :  */
     241             : static void
     242           2 : test_cancel(PGconn *conn)
     243             : {
     244             :     PGcancel   *cancel;
     245             :     PGcancelConn *cancelConn;
     246             :     PGconn     *monitorConn;
     247             :     char        errorbuf[256];
     248             : 
     249           2 :     fprintf(stderr, "test cancellations... ");
     250             : 
     251           2 :     if (PQsetnonblocking(conn, 1) != 0)
     252           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     253             : 
     254             :     /*
     255             :      * Make a separate connection to the database to monitor the query on the
     256             :      * main connection.
     257             :      */
     258           2 :     monitorConn = copy_connection(conn);
     259             :     Assert(PQstatus(monitorConn) == CONNECTION_OK);
     260             : 
     261             :     /* test PQcancel */
     262           2 :     send_cancellable_query(conn, monitorConn);
     263           2 :     cancel = PQgetCancel(conn);
     264           2 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     265           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     266           2 :     confirm_query_canceled(conn);
     267             : 
     268             :     /* PGcancel object can be reused for the next query */
     269           2 :     send_cancellable_query(conn, monitorConn);
     270           2 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     271           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     272           2 :     confirm_query_canceled(conn);
     273             : 
     274           2 :     PQfreeCancel(cancel);
     275             : 
     276             :     /* test PQrequestCancel */
     277           2 :     send_cancellable_query(conn, monitorConn);
     278           2 :     if (!PQrequestCancel(conn))
     279           0 :         pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
     280           2 :     confirm_query_canceled(conn);
     281             : 
     282             :     /* test PQcancelBlocking */
     283           2 :     send_cancellable_query(conn, monitorConn);
     284           2 :     cancelConn = PQcancelCreate(conn);
     285           2 :     if (!PQcancelBlocking(cancelConn))
     286           0 :         pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
     287           2 :     confirm_query_canceled(conn);
     288           2 :     PQcancelFinish(cancelConn);
     289             : 
     290             :     /* test PQcancelCreate and then polling with PQcancelPoll */
     291           2 :     send_cancellable_query(conn, monitorConn);
     292           2 :     cancelConn = PQcancelCreate(conn);
     293           2 :     if (!PQcancelStart(cancelConn))
     294           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     295             :     while (true)
     296           2 :     {
     297             :         struct timeval tv;
     298             :         fd_set      input_mask;
     299             :         fd_set      output_mask;
     300           4 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     301           4 :         int         sock = PQcancelSocket(cancelConn);
     302             : 
     303           4 :         if (pollres == PGRES_POLLING_OK)
     304           2 :             break;
     305             : 
     306           2 :         FD_ZERO(&input_mask);
     307           2 :         FD_ZERO(&output_mask);
     308           2 :         switch (pollres)
     309             :         {
     310           2 :             case PGRES_POLLING_READING:
     311             :                 pg_debug("polling for reads\n");
     312           2 :                 FD_SET(sock, &input_mask);
     313           2 :                 break;
     314           0 :             case PGRES_POLLING_WRITING:
     315             :                 pg_debug("polling for writes\n");
     316           0 :                 FD_SET(sock, &output_mask);
     317           0 :                 break;
     318           0 :             default:
     319           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     320             :         }
     321             : 
     322           2 :         if (sock < 0)
     323           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     324             : 
     325           2 :         tv.tv_sec = 3;
     326           2 :         tv.tv_usec = 0;
     327             : 
     328             :         while (true)
     329             :         {
     330           2 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     331             :             {
     332           0 :                 if (errno == EINTR)
     333           0 :                     continue;
     334           0 :                 pg_fatal("select() failed: %m");
     335             :             }
     336           2 :             break;
     337             :         }
     338             :     }
     339           2 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     340           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     341           2 :     confirm_query_canceled(conn);
     342             : 
     343             :     /*
     344             :      * test PQcancelReset works on the cancel connection and it can be reused
     345             :      * afterwards
     346             :      */
     347           2 :     PQcancelReset(cancelConn);
     348             : 
     349           2 :     send_cancellable_query(conn, monitorConn);
     350           2 :     if (!PQcancelStart(cancelConn))
     351           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     352             :     while (true)
     353           2 :     {
     354             :         struct timeval tv;
     355             :         fd_set      input_mask;
     356             :         fd_set      output_mask;
     357           4 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     358           4 :         int         sock = PQcancelSocket(cancelConn);
     359             : 
     360           4 :         if (pollres == PGRES_POLLING_OK)
     361           2 :             break;
     362             : 
     363           2 :         FD_ZERO(&input_mask);
     364           2 :         FD_ZERO(&output_mask);
     365           2 :         switch (pollres)
     366             :         {
     367           2 :             case PGRES_POLLING_READING:
     368             :                 pg_debug("polling for reads\n");
     369           2 :                 FD_SET(sock, &input_mask);
     370           2 :                 break;
     371           0 :             case PGRES_POLLING_WRITING:
     372             :                 pg_debug("polling for writes\n");
     373           0 :                 FD_SET(sock, &output_mask);
     374           0 :                 break;
     375           0 :             default:
     376           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     377             :         }
     378             : 
     379           2 :         if (sock < 0)
     380           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     381             : 
     382           2 :         tv.tv_sec = 3;
     383           2 :         tv.tv_usec = 0;
     384             : 
     385             :         while (true)
     386             :         {
     387           2 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     388             :             {
     389           0 :                 if (errno == EINTR)
     390           0 :                     continue;
     391           0 :                 pg_fatal("select() failed: %m");
     392             :             }
     393           2 :             break;
     394             :         }
     395             :     }
     396           2 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     397           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     398           2 :     confirm_query_canceled(conn);
     399             : 
     400           2 :     PQcancelFinish(cancelConn);
     401             : 
     402           2 :     fprintf(stderr, "ok\n");
     403           2 : }
     404             : 
     405             : static void
     406           2 : test_disallowed_in_pipeline(PGconn *conn)
     407             : {
     408           2 :     PGresult   *res = NULL;
     409             : 
     410           2 :     fprintf(stderr, "test error cases... ");
     411             : 
     412           2 :     if (PQisnonblocking(conn))
     413           0 :         pg_fatal("Expected blocking connection mode");
     414             : 
     415           2 :     if (PQenterPipelineMode(conn) != 1)
     416           0 :         pg_fatal("Unable to enter pipeline mode");
     417             : 
     418           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     419           0 :         pg_fatal("Pipeline mode not activated properly");
     420             : 
     421             :     /* PQexec should fail in pipeline mode */
     422           2 :     res = PQexec(conn, "SELECT 1");
     423           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     424           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     425           2 :     if (strcmp(PQerrorMessage(conn),
     426             :                "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     427           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     428             :                  PQerrorMessage(conn));
     429             : 
     430             :     /* PQsendQuery should fail in pipeline mode */
     431           2 :     if (PQsendQuery(conn, "SELECT 1") != 0)
     432           0 :         pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     433           2 :     if (strcmp(PQerrorMessage(conn),
     434             :                "PQsendQuery not allowed in pipeline mode\n") != 0)
     435           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     436             :                  PQerrorMessage(conn));
     437             : 
     438             :     /* Entering pipeline mode when already in pipeline mode is OK */
     439           2 :     if (PQenterPipelineMode(conn) != 1)
     440           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     441             : 
     442           2 :     if (PQisBusy(conn) != 0)
     443           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     444             : 
     445             :     /* ok, back to normal command mode */
     446           2 :     if (PQexitPipelineMode(conn) != 1)
     447           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     448             : 
     449           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     450           0 :         pg_fatal("Pipeline mode not terminated properly");
     451             : 
     452             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     453           2 :     if (PQexitPipelineMode(conn) != 1)
     454           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     455             : 
     456             :     /* can now PQexec again */
     457           2 :     res = PQexec(conn, "SELECT 1");
     458           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     459           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     460             :                  PQerrorMessage(conn));
     461             : 
     462           2 :     fprintf(stderr, "ok\n");
     463           2 : }
     464             : 
     465             : static void
     466           2 : test_multi_pipelines(PGconn *conn)
     467             : {
     468           2 :     PGresult   *res = NULL;
     469           2 :     const char *dummy_params[1] = {"1"};
     470           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     471             : 
     472           2 :     fprintf(stderr, "multi pipeline... ");
     473             : 
     474             :     /*
     475             :      * Queue up a couple of small pipelines and process each without returning
     476             :      * to command mode first.
     477             :      */
     478           2 :     if (PQenterPipelineMode(conn) != 1)
     479           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     480             : 
     481             :     /* first pipeline */
     482           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     483             :                           dummy_params, NULL, NULL, 0) != 1)
     484           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     485             : 
     486           2 :     if (PQpipelineSync(conn) != 1)
     487           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     488             : 
     489             :     /* second pipeline */
     490           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     491             :                           dummy_params, NULL, NULL, 0) != 1)
     492           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     493             : 
     494             :     /* Skip flushing once. */
     495           2 :     if (PQsendPipelineSync(conn) != 1)
     496           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     497             : 
     498             :     /* third pipeline */
     499           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     500             :                           dummy_params, NULL, NULL, 0) != 1)
     501           0 :         pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     502             : 
     503           2 :     if (PQpipelineSync(conn) != 1)
     504           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     505             : 
     506             :     /* OK, start processing the results */
     507             : 
     508             :     /* first pipeline */
     509             : 
     510           2 :     res = PQgetResult(conn);
     511           2 :     if (res == NULL)
     512           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     513             :                  PQerrorMessage(conn));
     514             : 
     515           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     516           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
     517             :                  PQresStatus(PQresultStatus(res)));
     518           2 :     PQclear(res);
     519           2 :     res = NULL;
     520             : 
     521           2 :     if (PQgetResult(conn) != NULL)
     522           0 :         pg_fatal("PQgetResult returned something extra after first result");
     523             : 
     524           2 :     if (PQexitPipelineMode(conn) != 0)
     525           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     526             : 
     527           2 :     res = PQgetResult(conn);
     528           2 :     if (res == NULL)
     529           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     530             :                  PQerrorMessage(conn));
     531             : 
     532           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     533           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     534             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     535           2 :     PQclear(res);
     536             : 
     537             :     /* second pipeline */
     538             : 
     539           2 :     res = PQgetResult(conn);
     540           2 :     if (res == NULL)
     541           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     542             :                  PQerrorMessage(conn));
     543             : 
     544           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     545           0 :         pg_fatal("Unexpected result code %s from second pipeline item",
     546             :                  PQresStatus(PQresultStatus(res)));
     547           2 :     PQclear(res);
     548           2 :     res = NULL;
     549             : 
     550           2 :     if (PQgetResult(conn) != NULL)
     551           0 :         pg_fatal("PQgetResult returned something extra after first result");
     552             : 
     553           2 :     if (PQexitPipelineMode(conn) != 0)
     554           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     555             : 
     556           2 :     res = PQgetResult(conn);
     557           2 :     if (res == NULL)
     558           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     559             :                  PQerrorMessage(conn));
     560             : 
     561           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     562           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     563             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     564           2 :     PQclear(res);
     565             : 
     566             :     /* third pipeline */
     567             : 
     568           2 :     res = PQgetResult(conn);
     569           2 :     if (res == NULL)
     570           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     571             :                  PQerrorMessage(conn));
     572             : 
     573           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     574           0 :         pg_fatal("Unexpected result code %s from third pipeline item",
     575             :                  PQresStatus(PQresultStatus(res)));
     576             : 
     577           2 :     res = PQgetResult(conn);
     578           2 :     if (res != NULL)
     579           0 :         pg_fatal("Expected null result, got %s",
     580             :                  PQresStatus(PQresultStatus(res)));
     581             : 
     582           2 :     res = PQgetResult(conn);
     583           2 :     if (res == NULL)
     584           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     585             :                  PQerrorMessage(conn));
     586             : 
     587           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     588           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     589             :                  PQresStatus(PQresultStatus(res)));
     590             : 
     591             :     /* We're still in pipeline mode ... */
     592           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     593           0 :         pg_fatal("Fell out of pipeline mode somehow");
     594             : 
     595             :     /* until we end it, which we can safely do now */
     596           2 :     if (PQexitPipelineMode(conn) != 1)
     597           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     598             :                  PQerrorMessage(conn));
     599             : 
     600           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     601           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     602             : 
     603           2 :     fprintf(stderr, "ok\n");
     604           2 : }
     605             : 
     606             : /*
     607             :  * Test behavior when a pipeline dispatches a number of commands that are
     608             :  * not flushed by a sync point.
     609             :  */
     610             : static void
     611           2 : test_nosync(PGconn *conn)
     612             : {
     613           2 :     int         numqueries = 10;
     614           2 :     int         results = 0;
     615           2 :     int         sock = PQsocket(conn);
     616             : 
     617           2 :     fprintf(stderr, "nosync... ");
     618             : 
     619           2 :     if (sock < 0)
     620           0 :         pg_fatal("invalid socket");
     621             : 
     622           2 :     if (PQenterPipelineMode(conn) != 1)
     623           0 :         pg_fatal("could not enter pipeline mode");
     624          22 :     for (int i = 0; i < numqueries; i++)
     625             :     {
     626             :         fd_set      input_mask;
     627             :         struct timeval tv;
     628             : 
     629          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     630             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     631           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     632          20 :         PQflush(conn);
     633             : 
     634             :         /*
     635             :          * If the server has written anything to us, read (some of) it now.
     636             :          */
     637          20 :         FD_ZERO(&input_mask);
     638          20 :         FD_SET(sock, &input_mask);
     639          20 :         tv.tv_sec = 0;
     640          20 :         tv.tv_usec = 0;
     641          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     642             :         {
     643           0 :             fprintf(stderr, "select() failed: %m\n");
     644           0 :             exit_nicely(conn);
     645             :         }
     646          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     647           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     648             :     }
     649             : 
     650             :     /* tell server to flush its output buffer */
     651           2 :     if (PQsendFlushRequest(conn) != 1)
     652           0 :         pg_fatal("failed to send flush request");
     653           2 :     PQflush(conn);
     654             : 
     655             :     /* Now read all results */
     656             :     for (;;)
     657          18 :     {
     658             :         PGresult   *res;
     659             : 
     660          20 :         res = PQgetResult(conn);
     661             : 
     662             :         /* NULL results are only expected after TUPLES_OK */
     663          20 :         if (res == NULL)
     664           0 :             pg_fatal("got unexpected NULL result after %d results", results);
     665             : 
     666             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     667          20 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     668             :         {
     669             :             PGresult   *res2;
     670             : 
     671             :             /* and one NULL result should follow each */
     672          20 :             res2 = PQgetResult(conn);
     673          20 :             if (res2 != NULL)
     674           0 :                 pg_fatal("expected NULL, got %s",
     675             :                          PQresStatus(PQresultStatus(res2)));
     676          20 :             PQclear(res);
     677          20 :             results++;
     678             : 
     679             :             /* if we're done, we're done */
     680          20 :             if (results == numqueries)
     681           2 :                 break;
     682             : 
     683          18 :             continue;
     684             :         }
     685             : 
     686             :         /* anything else is unexpected */
     687           0 :         pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
     688             :     }
     689             : 
     690           2 :     fprintf(stderr, "ok\n");
     691           2 : }
     692             : 
     693             : /*
     694             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     695             :  * still have to get results for each pipeline item, but the item will just be
     696             :  * a PGRES_PIPELINE_ABORTED code.
     697             :  *
     698             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     699             :  * usually use an xact, but in this case we want to observe the effects of each
     700             :  * statement.
     701             :  */
     702             : static void
     703           2 : test_pipeline_abort(PGconn *conn)
     704             : {
     705           2 :     PGresult   *res = NULL;
     706           2 :     const char *dummy_params[1] = {"1"};
     707           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     708             :     int         i;
     709             :     int         gotrows;
     710             :     bool        goterror;
     711             : 
     712           2 :     fprintf(stderr, "aborted pipeline... ");
     713             : 
     714           2 :     res = PQexec(conn, drop_table_sql);
     715           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     716           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     717             : 
     718           2 :     res = PQexec(conn, create_table_sql);
     719           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     720           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     721             : 
     722             :     /*
     723             :      * Queue up a couple of small pipelines and process each without returning
     724             :      * to command mode first. Make sure the second operation in the first
     725             :      * pipeline ERRORs.
     726             :      */
     727           2 :     if (PQenterPipelineMode(conn) != 1)
     728           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     729             : 
     730           2 :     dummy_params[0] = "1";
     731           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     732             :                           dummy_params, NULL, NULL, 0) != 1)
     733           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     734             : 
     735           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     736             :                           1, dummy_param_oids, dummy_params,
     737             :                           NULL, NULL, 0) != 1)
     738           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     739             : 
     740           2 :     dummy_params[0] = "2";
     741           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     742             :                           dummy_params, NULL, NULL, 0) != 1)
     743           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     744             : 
     745           2 :     if (PQpipelineSync(conn) != 1)
     746           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     747             : 
     748           2 :     dummy_params[0] = "3";
     749           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     750             :                           dummy_params, NULL, NULL, 0) != 1)
     751           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     752             :                  PQerrorMessage(conn));
     753             : 
     754           2 :     if (PQpipelineSync(conn) != 1)
     755           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     756             : 
     757             :     /*
     758             :      * OK, start processing the pipeline results.
     759             :      *
     760             :      * We should get a command-ok for the first query, then a fatal error and
     761             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     762             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     763             :      */
     764           2 :     res = PQgetResult(conn);
     765           2 :     if (res == NULL)
     766           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     767           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     768           0 :         pg_fatal("Unexpected result status %s: %s",
     769             :                  PQresStatus(PQresultStatus(res)),
     770             :                  PQresultErrorMessage(res));
     771           2 :     PQclear(res);
     772             : 
     773             :     /* NULL result to signal end-of-results for this command */
     774           2 :     if ((res = PQgetResult(conn)) != NULL)
     775           0 :         pg_fatal("Expected null result, got %s",
     776             :                  PQresStatus(PQresultStatus(res)));
     777             : 
     778             :     /* Second query caused error, so we expect an error next */
     779           2 :     res = PQgetResult(conn);
     780           2 :     if (res == NULL)
     781           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     782           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     783           0 :         pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
     784             :                  PQresStatus(PQresultStatus(res)));
     785           2 :     PQclear(res);
     786             : 
     787             :     /* NULL result to signal end-of-results for this command */
     788           2 :     if ((res = PQgetResult(conn)) != NULL)
     789           0 :         pg_fatal("Expected null result, got %s",
     790             :                  PQresStatus(PQresultStatus(res)));
     791             : 
     792             :     /*
     793             :      * pipeline should now be aborted.
     794             :      *
     795             :      * Note that we could still queue more queries at this point if we wanted;
     796             :      * they'd get added to a new third pipeline since we've already sent a
     797             :      * second. The aborted flag relates only to the pipeline being received.
     798             :      */
     799           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     800           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     801             : 
     802             :     /* third query in pipeline, the second insert */
     803           2 :     res = PQgetResult(conn);
     804           2 :     if (res == NULL)
     805           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     806           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
     807           0 :         pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
     808             :                  PQresStatus(PQresultStatus(res)));
     809           2 :     PQclear(res);
     810             : 
     811             :     /* NULL result to signal end-of-results for this command */
     812           2 :     if ((res = PQgetResult(conn)) != NULL)
     813           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     814             : 
     815           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     816           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     817             : 
     818             :     /* Ensure we're still in pipeline */
     819           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     820           0 :         pg_fatal("Fell out of pipeline mode somehow");
     821             : 
     822             :     /*
     823             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     824             :      *
     825             :      * (This is so clients know to start processing results normally again and
     826             :      * can tell the difference between skipped commands and the sync.)
     827             :      */
     828           2 :     res = PQgetResult(conn);
     829           2 :     if (res == NULL)
     830           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     831           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     832           0 :         pg_fatal("Unexpected result code from first pipeline sync\n"
     833             :                  "Expected PGRES_PIPELINE_SYNC, got %s",
     834             :                  PQresStatus(PQresultStatus(res)));
     835           2 :     PQclear(res);
     836             : 
     837           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     838           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     839             : 
     840             :     /* We're still in pipeline mode... */
     841           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     842           0 :         pg_fatal("Fell out of pipeline mode somehow");
     843             : 
     844             :     /* the insert from the second pipeline */
     845           2 :     res = PQgetResult(conn);
     846           2 :     if (res == NULL)
     847           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     848           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     849           0 :         pg_fatal("Unexpected result code %s from first item in second pipeline",
     850             :                  PQresStatus(PQresultStatus(res)));
     851           2 :     PQclear(res);
     852             : 
     853             :     /* Read the NULL result at the end of the command */
     854           2 :     if ((res = PQgetResult(conn)) != NULL)
     855           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     856             : 
     857             :     /* the second pipeline sync */
     858           2 :     if ((res = PQgetResult(conn)) == NULL)
     859           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     860           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     861           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     862             :                  PQresStatus(PQresultStatus(res)));
     863           2 :     PQclear(res);
     864             : 
     865           2 :     if ((res = PQgetResult(conn)) != NULL)
     866           0 :         pg_fatal("Expected null result, got %s: %s",
     867             :                  PQresStatus(PQresultStatus(res)),
     868             :                  PQerrorMessage(conn));
     869             : 
     870             :     /* Try to send two queries in one command */
     871           2 :     if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     872           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     873           2 :     if (PQpipelineSync(conn) != 1)
     874           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     875           2 :     goterror = false;
     876           4 :     while ((res = PQgetResult(conn)) != NULL)
     877             :     {
     878           2 :         switch (PQresultStatus(res))
     879             :         {
     880           2 :             case PGRES_FATAL_ERROR:
     881           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     882           0 :                     pg_fatal("expected error about multiple commands, got %s",
     883             :                              PQerrorMessage(conn));
     884           2 :                 printf("got expected %s", PQerrorMessage(conn));
     885           2 :                 goterror = true;
     886           2 :                 break;
     887           0 :             default:
     888           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     889             :                 break;
     890             :         }
     891             :     }
     892           2 :     if (!goterror)
     893           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     894           2 :     res = PQgetResult(conn);
     895           2 :     if (res == NULL)
     896           0 :         pg_fatal("got NULL result");
     897           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     898           0 :         pg_fatal("Unexpected result code %s from pipeline sync",
     899             :                  PQresStatus(PQresultStatus(res)));
     900           2 :     fprintf(stderr, "ok\n");
     901             : 
     902             :     /* Test single-row mode with an error partways */
     903           2 :     if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     904             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
     905           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     906           2 :     if (PQpipelineSync(conn) != 1)
     907           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     908           2 :     PQsetSingleRowMode(conn);
     909           2 :     goterror = false;
     910           2 :     gotrows = 0;
     911          10 :     while ((res = PQgetResult(conn)) != NULL)
     912             :     {
     913           8 :         switch (PQresultStatus(res))
     914             :         {
     915           6 :             case PGRES_SINGLE_TUPLE:
     916           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     917           6 :                 gotrows++;
     918           6 :                 break;
     919           2 :             case PGRES_FATAL_ERROR:
     920           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     921           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     922             :                              PQerrorMessage(conn),
     923             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     924           2 :                 printf("got expected division-by-zero\n");
     925           2 :                 goterror = true;
     926           2 :                 break;
     927           0 :             default:
     928           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     929             :         }
     930           8 :         PQclear(res);
     931             :     }
     932           2 :     if (!goterror)
     933           0 :         pg_fatal("did not get division-by-zero error");
     934           2 :     if (gotrows != 3)
     935           0 :         pg_fatal("did not get three rows");
     936             :     /* the third pipeline sync */
     937           2 :     if ((res = PQgetResult(conn)) == NULL)
     938           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     939           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     940           0 :         pg_fatal("Unexpected result code %s from third pipeline sync",
     941             :                  PQresStatus(PQresultStatus(res)));
     942           2 :     PQclear(res);
     943             : 
     944             :     /* We're still in pipeline mode... */
     945           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     946           0 :         pg_fatal("Fell out of pipeline mode somehow");
     947             : 
     948             :     /* until we end it, which we can safely do now */
     949           2 :     if (PQexitPipelineMode(conn) != 1)
     950           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     951             :                  PQerrorMessage(conn));
     952             : 
     953           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     954           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     955             : 
     956             :     /*-
     957             :      * Since we fired the pipelines off without a surrounding xact, the results
     958             :      * should be:
     959             :      *
     960             :      * - Implicit xact started by server around 1st pipeline
     961             :      * - First insert applied
     962             :      * - Second statement aborted xact
     963             :      * - Third insert skipped
     964             :      * - Sync rolled back first implicit xact
     965             :      * - Implicit xact created by server around 2nd pipeline
     966             :      * - insert applied from 2nd pipeline
     967             :      * - Sync commits 2nd xact
     968             :      *
     969             :      * So we should only have the value 3 that we inserted.
     970             :      */
     971           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     972             : 
     973           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     974           0 :         pg_fatal("Expected tuples, got %s: %s",
     975             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     976           2 :     if (PQntuples(res) != 1)
     977           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     978           4 :     for (i = 0; i < PQntuples(res); i++)
     979             :     {
     980           2 :         const char *val = PQgetvalue(res, i, 0);
     981             : 
     982           2 :         if (strcmp(val, "3") != 0)
     983           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     984             :     }
     985             : 
     986           2 :     PQclear(res);
     987             : 
     988           2 :     fprintf(stderr, "ok\n");
     989           2 : }
     990             : 
     991             : /* State machine enum for test_pipelined_insert */
     992             : enum PipelineInsertStep
     993             : {
     994             :     BI_BEGIN_TX,
     995             :     BI_DROP_TABLE,
     996             :     BI_CREATE_TABLE,
     997             :     BI_PREPARE,
     998             :     BI_INSERT_ROWS,
     999             :     BI_COMMIT_TX,
    1000             :     BI_SYNC,
    1001             :     BI_DONE,
    1002             : };
    1003             : 
    1004             : static void
    1005           2 : test_pipelined_insert(PGconn *conn, int n_rows)
    1006             : {
    1007           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
    1008             :     const char *insert_params[2];
    1009             :     char        insert_param_0[MAXINTLEN];
    1010             :     char        insert_param_1[MAXINT8LEN];
    1011           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
    1012           2 :                 recv_step = BI_BEGIN_TX;
    1013             :     int         rows_to_send,
    1014             :                 rows_to_receive;
    1015             : 
    1016           2 :     insert_params[0] = insert_param_0;
    1017           2 :     insert_params[1] = insert_param_1;
    1018             : 
    1019           2 :     rows_to_send = rows_to_receive = n_rows;
    1020             : 
    1021             :     /*
    1022             :      * Do a pipelined insert into a table created at the start of the pipeline
    1023             :      */
    1024           2 :     if (PQenterPipelineMode(conn) != 1)
    1025           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1026             : 
    1027           8 :     while (send_step != BI_PREPARE)
    1028             :     {
    1029             :         const char *sql;
    1030             : 
    1031           6 :         switch (send_step)
    1032             :         {
    1033           2 :             case BI_BEGIN_TX:
    1034           2 :                 sql = "BEGIN TRANSACTION";
    1035           2 :                 send_step = BI_DROP_TABLE;
    1036           2 :                 break;
    1037             : 
    1038           2 :             case BI_DROP_TABLE:
    1039           2 :                 sql = drop_table_sql;
    1040           2 :                 send_step = BI_CREATE_TABLE;
    1041           2 :                 break;
    1042             : 
    1043           2 :             case BI_CREATE_TABLE:
    1044           2 :                 sql = create_table_sql;
    1045           2 :                 send_step = BI_PREPARE;
    1046           2 :                 break;
    1047             : 
    1048           0 :             default:
    1049           0 :                 pg_fatal("invalid state");
    1050             :                 sql = NULL;     /* keep compiler quiet */
    1051             :         }
    1052             : 
    1053             :         pg_debug("sending: %s\n", sql);
    1054           6 :         if (PQsendQueryParams(conn, sql,
    1055             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
    1056           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
    1057             :     }
    1058             : 
    1059             :     Assert(send_step == BI_PREPARE);
    1060             :     pg_debug("sending: %s\n", insert_sql2);
    1061           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
    1062           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
    1063           2 :     send_step = BI_INSERT_ROWS;
    1064             : 
    1065             :     /*
    1066             :      * Now we start inserting. We'll be sending enough data that we could fill
    1067             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
    1068             :      * mode and consume input while we send more output. As results of each
    1069             :      * query are processed we should pop them to allow processing of the next
    1070             :      * query. There's no need to finish the pipeline before processing
    1071             :      * results.
    1072             :      */
    1073           2 :     if (PQsetnonblocking(conn, 1) != 0)
    1074           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
    1075             : 
    1076       28018 :     while (recv_step != BI_DONE)
    1077             :     {
    1078             :         int         sock;
    1079             :         fd_set      input_mask;
    1080             :         fd_set      output_mask;
    1081             : 
    1082       28016 :         sock = PQsocket(conn);
    1083             : 
    1084       28016 :         if (sock < 0)
    1085           0 :             break;              /* shouldn't happen */
    1086             : 
    1087       28016 :         FD_ZERO(&input_mask);
    1088       28016 :         FD_SET(sock, &input_mask);
    1089       28016 :         FD_ZERO(&output_mask);
    1090       28016 :         FD_SET(sock, &output_mask);
    1091             : 
    1092       28016 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
    1093             :         {
    1094           0 :             fprintf(stderr, "select() failed: %m\n");
    1095           0 :             exit_nicely(conn);
    1096             :         }
    1097             : 
    1098             :         /*
    1099             :          * Process any results, so we keep the server's output buffer free
    1100             :          * flowing and it can continue to process input
    1101             :          */
    1102       28016 :         if (FD_ISSET(sock, &input_mask))
    1103             :         {
    1104           4 :             PQconsumeInput(conn);
    1105             : 
    1106             :             /* Read until we'd block if we tried to read */
    1107        2826 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
    1108             :             {
    1109             :                 PGresult   *res;
    1110        2822 :                 const char *cmdtag = "";
    1111        2822 :                 const char *description = "";
    1112             :                 int         status;
    1113             : 
    1114             :                 /*
    1115             :                  * Read next result.  If no more results from this query,
    1116             :                  * advance to the next query
    1117             :                  */
    1118        2822 :                 res = PQgetResult(conn);
    1119        2822 :                 if (res == NULL)
    1120        1410 :                     continue;
    1121             : 
    1122        1412 :                 status = PGRES_COMMAND_OK;
    1123        1412 :                 switch (recv_step)
    1124             :                 {
    1125           2 :                     case BI_BEGIN_TX:
    1126           2 :                         cmdtag = "BEGIN";
    1127           2 :                         recv_step++;
    1128           2 :                         break;
    1129           2 :                     case BI_DROP_TABLE:
    1130           2 :                         cmdtag = "DROP TABLE";
    1131           2 :                         recv_step++;
    1132           2 :                         break;
    1133           2 :                     case BI_CREATE_TABLE:
    1134           2 :                         cmdtag = "CREATE TABLE";
    1135           2 :                         recv_step++;
    1136           2 :                         break;
    1137           2 :                     case BI_PREPARE:
    1138           2 :                         cmdtag = "";
    1139           2 :                         description = "PREPARE";
    1140           2 :                         recv_step++;
    1141           2 :                         break;
    1142        1400 :                     case BI_INSERT_ROWS:
    1143        1400 :                         cmdtag = "INSERT";
    1144        1400 :                         rows_to_receive--;
    1145        1400 :                         if (rows_to_receive == 0)
    1146           2 :                             recv_step++;
    1147        1400 :                         break;
    1148           2 :                     case BI_COMMIT_TX:
    1149           2 :                         cmdtag = "COMMIT";
    1150           2 :                         recv_step++;
    1151           2 :                         break;
    1152           2 :                     case BI_SYNC:
    1153           2 :                         cmdtag = "";
    1154           2 :                         description = "SYNC";
    1155           2 :                         status = PGRES_PIPELINE_SYNC;
    1156           2 :                         recv_step++;
    1157           2 :                         break;
    1158           0 :                     case BI_DONE:
    1159             :                         /* unreachable */
    1160           0 :                         pg_fatal("unreachable state");
    1161             :                 }
    1162             : 
    1163        1412 :                 if (PQresultStatus(res) != status)
    1164           0 :                     pg_fatal("%s reported status %s, expected %s\n"
    1165             :                              "Error message: \"%s\"",
    1166             :                              description, PQresStatus(PQresultStatus(res)),
    1167             :                              PQresStatus(status), PQerrorMessage(conn));
    1168             : 
    1169        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
    1170           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
    1171             :                              description, cmdtag, PQcmdStatus(res));
    1172             : 
    1173             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
    1174             : 
    1175        1412 :                 PQclear(res);
    1176             :             }
    1177             :         }
    1178             : 
    1179             :         /* Write more rows and/or the end pipeline message, if needed */
    1180       28016 :         if (FD_ISSET(sock, &output_mask))
    1181             :         {
    1182       28014 :             PQflush(conn);
    1183             : 
    1184       28014 :             if (send_step == BI_INSERT_ROWS)
    1185             :             {
    1186        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
    1187             :                 /* use up some buffer space with a wide value */
    1188        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
    1189             : 
    1190        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
    1191             :                                         2, insert_params, NULL, NULL, 0) == 1)
    1192             :                 {
    1193             :                     pg_debug("sent row %d\n", rows_to_send);
    1194             : 
    1195        1400 :                     rows_to_send--;
    1196        1400 :                     if (rows_to_send == 0)
    1197           2 :                         send_step++;
    1198             :                 }
    1199             :                 else
    1200             :                 {
    1201             :                     /*
    1202             :                      * in nonblocking mode, so it's OK for an insert to fail
    1203             :                      * to send
    1204             :                      */
    1205           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
    1206             :                             rows_to_send, PQerrorMessage(conn));
    1207             :                 }
    1208             :             }
    1209       26614 :             else if (send_step == BI_COMMIT_TX)
    1210             :             {
    1211           2 :                 if (PQsendQueryParams(conn, "COMMIT",
    1212             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
    1213             :                 {
    1214             :                     pg_debug("sent COMMIT\n");
    1215           2 :                     send_step++;
    1216             :                 }
    1217             :                 else
    1218             :                 {
    1219           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
    1220             :                             PQerrorMessage(conn));
    1221             :                 }
    1222             :             }
    1223       26612 :             else if (send_step == BI_SYNC)
    1224             :             {
    1225           2 :                 if (PQpipelineSync(conn) == 1)
    1226             :                 {
    1227           2 :                     fprintf(stdout, "pipeline sync sent\n");
    1228           2 :                     send_step++;
    1229             :                 }
    1230             :                 else
    1231             :                 {
    1232           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
    1233             :                             PQerrorMessage(conn));
    1234             :                 }
    1235             :             }
    1236             :         }
    1237             :     }
    1238             : 
    1239             :     /* We've got the sync message and the pipeline should be done */
    1240           2 :     if (PQexitPipelineMode(conn) != 1)
    1241           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1242             :                  PQerrorMessage(conn));
    1243             : 
    1244           2 :     if (PQsetnonblocking(conn, 0) != 0)
    1245           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
    1246             : 
    1247           2 :     fprintf(stderr, "ok\n");
    1248           2 : }
    1249             : 
    1250             : static void
    1251           2 : test_prepared(PGconn *conn)
    1252             : {
    1253           2 :     PGresult   *res = NULL;
    1254           2 :     Oid         param_oids[1] = {INT4OID};
    1255             :     Oid         expected_oids[4];
    1256             :     Oid         typ;
    1257             : 
    1258           2 :     fprintf(stderr, "prepared... ");
    1259             : 
    1260           2 :     if (PQenterPipelineMode(conn) != 1)
    1261           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1262           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
    1263             :                       "interval '1 sec'",
    1264             :                       1, param_oids) != 1)
    1265           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
    1266           2 :     expected_oids[0] = INT4OID;
    1267           2 :     expected_oids[1] = TEXTOID;
    1268           2 :     expected_oids[2] = NUMERICOID;
    1269           2 :     expected_oids[3] = INTERVALOID;
    1270           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
    1271           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
    1272           2 :     if (PQpipelineSync(conn) != 1)
    1273           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1274             : 
    1275           2 :     res = PQgetResult(conn);
    1276           2 :     if (res == NULL)
    1277           0 :         pg_fatal("PQgetResult returned null");
    1278           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1279           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1280           2 :     PQclear(res);
    1281           2 :     res = PQgetResult(conn);
    1282           2 :     if (res != NULL)
    1283           0 :         pg_fatal("expected NULL result");
    1284             : 
    1285           2 :     res = PQgetResult(conn);
    1286           2 :     if (res == NULL)
    1287           0 :         pg_fatal("PQgetResult returned NULL");
    1288           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1289           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1290           2 :     if (PQnfields(res) != lengthof(expected_oids))
    1291           0 :         pg_fatal("expected %zu columns, got %d",
    1292             :                  lengthof(expected_oids), PQnfields(res));
    1293          10 :     for (int i = 0; i < PQnfields(res); i++)
    1294             :     {
    1295           8 :         typ = PQftype(res, i);
    1296           8 :         if (typ != expected_oids[i])
    1297           0 :             pg_fatal("field %d: expected type %u, got %u",
    1298             :                      i, expected_oids[i], typ);
    1299             :     }
    1300           2 :     PQclear(res);
    1301           2 :     res = PQgetResult(conn);
    1302           2 :     if (res != NULL)
    1303           0 :         pg_fatal("expected NULL result");
    1304             : 
    1305           2 :     res = PQgetResult(conn);
    1306           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1307           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1308             : 
    1309           2 :     fprintf(stderr, "closing statement..");
    1310           2 :     if (PQsendClosePrepared(conn, "select_one") != 1)
    1311           0 :         pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
    1312           2 :     if (PQpipelineSync(conn) != 1)
    1313           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1314             : 
    1315           2 :     res = PQgetResult(conn);
    1316           2 :     if (res == NULL)
    1317           0 :         pg_fatal("expected non-NULL result");
    1318           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1319           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1320           2 :     PQclear(res);
    1321           2 :     res = PQgetResult(conn);
    1322           2 :     if (res != NULL)
    1323           0 :         pg_fatal("expected NULL result");
    1324           2 :     res = PQgetResult(conn);
    1325           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1326           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1327             : 
    1328           2 :     if (PQexitPipelineMode(conn) != 1)
    1329           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1330             : 
    1331             :     /* Now that it's closed we should get an error when describing */
    1332           2 :     res = PQdescribePrepared(conn, "select_one");
    1333           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1334           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1335             : 
    1336             :     /*
    1337             :      * Also test the blocking close, this should not fail since closing a
    1338             :      * non-existent prepared statement is a no-op
    1339             :      */
    1340           2 :     res = PQclosePrepared(conn, "select_one");
    1341           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1342           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1343             : 
    1344           2 :     fprintf(stderr, "creating portal... ");
    1345           2 :     PQexec(conn, "BEGIN");
    1346           2 :     PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1347           2 :     PQenterPipelineMode(conn);
    1348           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1349           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1350           2 :     if (PQpipelineSync(conn) != 1)
    1351           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1352           2 :     res = PQgetResult(conn);
    1353           2 :     if (res == NULL)
    1354           0 :         pg_fatal("PQgetResult returned null");
    1355           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1356           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1357             : 
    1358           2 :     typ = PQftype(res, 0);
    1359           2 :     if (typ != INT4OID)
    1360           0 :         pg_fatal("portal: expected type %u, got %u",
    1361             :                  INT4OID, typ);
    1362           2 :     PQclear(res);
    1363           2 :     res = PQgetResult(conn);
    1364           2 :     if (res != NULL)
    1365           0 :         pg_fatal("expected NULL result");
    1366           2 :     res = PQgetResult(conn);
    1367           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1368           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1369             : 
    1370           2 :     fprintf(stderr, "closing portal... ");
    1371           2 :     if (PQsendClosePortal(conn, "cursor_one") != 1)
    1372           0 :         pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1373           2 :     if (PQpipelineSync(conn) != 1)
    1374           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1375             : 
    1376           2 :     res = PQgetResult(conn);
    1377           2 :     if (res == NULL)
    1378           0 :         pg_fatal("expected non-NULL result");
    1379           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1380           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1381           2 :     PQclear(res);
    1382           2 :     res = PQgetResult(conn);
    1383           2 :     if (res != NULL)
    1384           0 :         pg_fatal("expected NULL result");
    1385           2 :     res = PQgetResult(conn);
    1386           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1387           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1388             : 
    1389           2 :     if (PQexitPipelineMode(conn) != 1)
    1390           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1391             : 
    1392             :     /* Now that it's closed we should get an error when describing */
    1393           2 :     res = PQdescribePortal(conn, "cursor_one");
    1394           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1395           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1396             : 
    1397             :     /*
    1398             :      * Also test the blocking close, this should not fail since closing a
    1399             :      * non-existent portal is a no-op
    1400             :      */
    1401           2 :     res = PQclosePortal(conn, "cursor_one");
    1402           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1403           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1404             : 
    1405           2 :     fprintf(stderr, "ok\n");
    1406           2 : }
    1407             : 
    1408             : /* Notice processor: print notices, and count how many we got */
    1409             : static void
    1410           2 : notice_processor(void *arg, const char *message)
    1411             : {
    1412           2 :     int        *n_notices = (int *) arg;
    1413             : 
    1414           2 :     (*n_notices)++;
    1415           2 :     fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1416           2 : }
    1417             : 
    1418             : /* Verify behavior in "idle" state */
    1419             : static void
    1420           2 : test_pipeline_idle(PGconn *conn)
    1421             : {
    1422             :     PGresult   *res;
    1423           2 :     int         n_notices = 0;
    1424             : 
    1425           2 :     fprintf(stderr, "\npipeline idle...\n");
    1426             : 
    1427           2 :     PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1428             : 
    1429             :     /* Try to exit pipeline mode in pipeline-idle state */
    1430           2 :     if (PQenterPipelineMode(conn) != 1)
    1431           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1432           2 :     if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1433           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1434           2 :     PQsendFlushRequest(conn);
    1435           2 :     res = PQgetResult(conn);
    1436           2 :     if (res == NULL)
    1437           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1438             :                  PQerrorMessage(conn));
    1439           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1440           0 :         pg_fatal("unexpected result code %s from first pipeline item",
    1441             :                  PQresStatus(PQresultStatus(res)));
    1442           2 :     PQclear(res);
    1443           2 :     res = PQgetResult(conn);
    1444           2 :     if (res != NULL)
    1445           0 :         pg_fatal("did not receive terminating NULL");
    1446           2 :     if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1447           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1448           2 :     if (PQexitPipelineMode(conn) == 1)
    1449           0 :         pg_fatal("exiting pipeline succeeded when it shouldn't");
    1450           2 :     if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1451             :                 strlen("cannot exit pipeline mode")) != 0)
    1452           0 :         pg_fatal("did not get expected error; got: %s",
    1453             :                  PQerrorMessage(conn));
    1454           2 :     PQsendFlushRequest(conn);
    1455           2 :     res = PQgetResult(conn);
    1456           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1457           0 :         pg_fatal("unexpected result code %s from second pipeline item",
    1458             :                  PQresStatus(PQresultStatus(res)));
    1459           2 :     PQclear(res);
    1460           2 :     res = PQgetResult(conn);
    1461           2 :     if (res != NULL)
    1462           0 :         pg_fatal("did not receive terminating NULL");
    1463           2 :     if (PQexitPipelineMode(conn) != 1)
    1464           0 :         pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1465             : 
    1466           2 :     if (n_notices > 0)
    1467           0 :         pg_fatal("got %d notice(s)", n_notices);
    1468           2 :     fprintf(stderr, "ok - 1\n");
    1469             : 
    1470             :     /* Have a WARNING in the middle of a resultset */
    1471           2 :     if (PQenterPipelineMode(conn) != 1)
    1472           0 :         pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1473           2 :     if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1474           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1475           2 :     PQsendFlushRequest(conn);
    1476           2 :     res = PQgetResult(conn);
    1477           2 :     if (res == NULL)
    1478           0 :         pg_fatal("unexpected NULL result received");
    1479           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1480           0 :         pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
    1481           2 :     if (PQexitPipelineMode(conn) != 1)
    1482           0 :         pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1483           2 :     fprintf(stderr, "ok - 2\n");
    1484           2 : }
    1485             : 
    1486             : static void
    1487           2 : test_simple_pipeline(PGconn *conn)
    1488             : {
    1489           2 :     PGresult   *res = NULL;
    1490           2 :     const char *dummy_params[1] = {"1"};
    1491           2 :     Oid         dummy_param_oids[1] = {INT4OID};
    1492             : 
    1493           2 :     fprintf(stderr, "simple pipeline... ");
    1494             : 
    1495             :     /*
    1496             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
    1497             :      * process the results of as they come in.
    1498             :      *
    1499             :      * For a simple case we should be able to do this without interim
    1500             :      * processing of results since our output buffer will give us enough slush
    1501             :      * to work with and we won't block on sending. So blocking mode is fine.
    1502             :      */
    1503           2 :     if (PQisnonblocking(conn))
    1504           0 :         pg_fatal("Expected blocking connection mode");
    1505             : 
    1506           2 :     if (PQenterPipelineMode(conn) != 1)
    1507           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1508             : 
    1509           2 :     if (PQsendQueryParams(conn, "SELECT $1",
    1510             :                           1, dummy_param_oids, dummy_params,
    1511             :                           NULL, NULL, 0) != 1)
    1512           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1513             : 
    1514           2 :     if (PQexitPipelineMode(conn) != 0)
    1515           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1516             : 
    1517           2 :     if (PQpipelineSync(conn) != 1)
    1518           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1519             : 
    1520           2 :     res = PQgetResult(conn);
    1521           2 :     if (res == NULL)
    1522           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1523             :                  PQerrorMessage(conn));
    1524             : 
    1525           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1526           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
    1527             :                  PQresStatus(PQresultStatus(res)));
    1528             : 
    1529           2 :     PQclear(res);
    1530           2 :     res = NULL;
    1531             : 
    1532           2 :     if (PQgetResult(conn) != NULL)
    1533           0 :         pg_fatal("PQgetResult returned something extra after first query result.");
    1534             : 
    1535             :     /*
    1536             :      * Even though we've processed the result there's still a sync to come and
    1537             :      * we can't exit pipeline mode yet
    1538             :      */
    1539           2 :     if (PQexitPipelineMode(conn) != 0)
    1540           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1541             : 
    1542           2 :     res = PQgetResult(conn);
    1543           2 :     if (res == NULL)
    1544           0 :         pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
    1545             :                  PQerrorMessage(conn));
    1546             : 
    1547           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1548           0 :         pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
    1549             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
    1550             : 
    1551           2 :     PQclear(res);
    1552           2 :     res = NULL;
    1553             : 
    1554           2 :     if (PQgetResult(conn) != NULL)
    1555           0 :         pg_fatal("PQgetResult returned something extra after pipeline end: %s",
    1556             :                  PQresStatus(PQresultStatus(res)));
    1557             : 
    1558             :     /* We're still in pipeline mode... */
    1559           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1560           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1561             : 
    1562             :     /* ... until we end it, which we can safely do now */
    1563           2 :     if (PQexitPipelineMode(conn) != 1)
    1564           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1565             :                  PQerrorMessage(conn));
    1566             : 
    1567           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1568           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1569             : 
    1570           2 :     fprintf(stderr, "ok\n");
    1571           2 : }
    1572             : 
    1573             : static void
    1574           2 : test_singlerowmode(PGconn *conn)
    1575             : {
    1576             :     PGresult   *res;
    1577             :     int         i;
    1578           2 :     bool        pipeline_ended = false;
    1579             : 
    1580           2 :     if (PQenterPipelineMode(conn) != 1)
    1581           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1582             :                  PQerrorMessage(conn));
    1583             : 
    1584             :     /* One series of three commands, using single-row mode for the first two. */
    1585           8 :     for (i = 0; i < 3; i++)
    1586             :     {
    1587             :         char       *param[1];
    1588             : 
    1589           6 :         param[0] = psprintf("%d", 44 + i);
    1590             : 
    1591           6 :         if (PQsendQueryParams(conn,
    1592             :                               "SELECT generate_series(42, $1)",
    1593             :                               1,
    1594             :                               NULL,
    1595             :                               (const char **) param,
    1596             :                               NULL,
    1597             :                               NULL,
    1598             :                               0) != 1)
    1599           0 :             pg_fatal("failed to send query: %s",
    1600             :                      PQerrorMessage(conn));
    1601           6 :         pfree(param[0]);
    1602             :     }
    1603           2 :     if (PQpipelineSync(conn) != 1)
    1604           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1605             : 
    1606          10 :     for (i = 0; !pipeline_ended; i++)
    1607             :     {
    1608           8 :         bool        first = true;
    1609             :         bool        saw_ending_tuplesok;
    1610           8 :         bool        isSingleTuple = false;
    1611             : 
    1612             :         /* Set single row mode for only first 2 SELECT queries */
    1613           8 :         if (i < 2)
    1614             :         {
    1615           4 :             if (PQsetSingleRowMode(conn) != 1)
    1616           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1617             :         }
    1618             : 
    1619             :         /* Consume rows for this query */
    1620           8 :         saw_ending_tuplesok = false;
    1621          28 :         while ((res = PQgetResult(conn)) != NULL)
    1622             :         {
    1623          22 :             ExecStatusType est = PQresultStatus(res);
    1624             : 
    1625          22 :             if (est == PGRES_PIPELINE_SYNC)
    1626             :             {
    1627           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1628           2 :                 pipeline_ended = true;
    1629           2 :                 PQclear(res);
    1630           2 :                 if (i != 3)
    1631           0 :                     pg_fatal("Expected three results, got %d", i);
    1632           2 :                 break;
    1633             :             }
    1634             : 
    1635             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1636          20 :             if (first)
    1637             :             {
    1638           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1639           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1640             :                              i, PQresStatus(est));
    1641           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1642           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1643             :                              i, PQresStatus(est));
    1644           6 :                 first = false;
    1645             :             }
    1646             : 
    1647          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1648          20 :             switch (est)
    1649             :             {
    1650           6 :                 case PGRES_TUPLES_OK:
    1651           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1652           6 :                     saw_ending_tuplesok = true;
    1653           6 :                     if (isSingleTuple)
    1654             :                     {
    1655           4 :                         if (PQntuples(res) == 0)
    1656           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1657             :                         else
    1658           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1659             :                     }
    1660           6 :                     break;
    1661             : 
    1662          14 :                 case PGRES_SINGLE_TUPLE:
    1663          14 :                     isSingleTuple = true;
    1664          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1665          14 :                     break;
    1666             : 
    1667           0 :                 default:
    1668           0 :                     pg_fatal("unexpected");
    1669             :             }
    1670          20 :             PQclear(res);
    1671             :         }
    1672           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1673           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1674             :     }
    1675             : 
    1676             :     /*
    1677             :      * Now issue one command, get its results in with single-row mode, then
    1678             :      * issue another command, and get its results in normal mode; make sure
    1679             :      * the single-row mode flag is reset as expected.
    1680             :      */
    1681           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1682             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1683           0 :         pg_fatal("failed to send query: %s",
    1684             :                  PQerrorMessage(conn));
    1685           2 :     if (PQsendFlushRequest(conn) != 1)
    1686           0 :         pg_fatal("failed to send flush request");
    1687           2 :     if (PQsetSingleRowMode(conn) != 1)
    1688           0 :         pg_fatal("PQsetSingleRowMode() failed");
    1689           2 :     res = PQgetResult(conn);
    1690           2 :     if (res == NULL)
    1691           0 :         pg_fatal("unexpected NULL");
    1692           2 :     if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
    1693           0 :         pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
    1694             :                  PQresStatus(PQresultStatus(res)));
    1695           2 :     res = PQgetResult(conn);
    1696           2 :     if (res == NULL)
    1697           0 :         pg_fatal("unexpected NULL");
    1698           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1699           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1700             :                  PQresStatus(PQresultStatus(res)));
    1701           2 :     if (PQgetResult(conn) != NULL)
    1702           0 :         pg_fatal("expected NULL result");
    1703             : 
    1704           2 :     if (PQsendQueryParams(conn, "SELECT 1",
    1705             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1706           0 :         pg_fatal("failed to send query: %s",
    1707             :                  PQerrorMessage(conn));
    1708           2 :     if (PQsendFlushRequest(conn) != 1)
    1709           0 :         pg_fatal("failed to send flush request");
    1710           2 :     res = PQgetResult(conn);
    1711           2 :     if (res == NULL)
    1712           0 :         pg_fatal("unexpected NULL");
    1713           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1714           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1715             :                  PQresStatus(PQresultStatus(res)));
    1716           2 :     if (PQgetResult(conn) != NULL)
    1717           0 :         pg_fatal("expected NULL result");
    1718             : 
    1719             :     /*
    1720             :      * Try chunked mode as well; make sure that it correctly delivers a
    1721             :      * partial final chunk.
    1722             :      */
    1723           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
    1724             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1725           0 :         pg_fatal("failed to send query: %s",
    1726             :                  PQerrorMessage(conn));
    1727           2 :     if (PQsendFlushRequest(conn) != 1)
    1728           0 :         pg_fatal("failed to send flush request");
    1729           2 :     if (PQsetChunkedRowsMode(conn, 3) != 1)
    1730           0 :         pg_fatal("PQsetChunkedRowsMode() failed");
    1731           2 :     res = PQgetResult(conn);
    1732           2 :     if (res == NULL)
    1733           0 :         pg_fatal("unexpected NULL");
    1734           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1735           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
    1736             :                  PQresStatus(PQresultStatus(res)),
    1737             :                  PQerrorMessage(conn));
    1738           2 :     if (PQntuples(res) != 3)
    1739           0 :         pg_fatal("Expected 3 rows, got %d", PQntuples(res));
    1740           2 :     res = PQgetResult(conn);
    1741           2 :     if (res == NULL)
    1742           0 :         pg_fatal("unexpected NULL");
    1743           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1744           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
    1745             :                  PQresStatus(PQresultStatus(res)));
    1746           2 :     if (PQntuples(res) != 2)
    1747           0 :         pg_fatal("Expected 2 rows, got %d", PQntuples(res));
    1748           2 :     res = PQgetResult(conn);
    1749           2 :     if (res == NULL)
    1750           0 :         pg_fatal("unexpected NULL");
    1751           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1752           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1753             :                  PQresStatus(PQresultStatus(res)));
    1754           2 :     if (PQntuples(res) != 0)
    1755           0 :         pg_fatal("Expected 0 rows, got %d", PQntuples(res));
    1756           2 :     if (PQgetResult(conn) != NULL)
    1757           0 :         pg_fatal("expected NULL result");
    1758             : 
    1759           2 :     if (PQexitPipelineMode(conn) != 1)
    1760           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1761             : 
    1762           2 :     fprintf(stderr, "ok\n");
    1763           2 : }
    1764             : 
    1765             : /*
    1766             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1767             :  * an error, ignoring transaction commands.
    1768             :  */
    1769             : static void
    1770           2 : test_transaction(PGconn *conn)
    1771             : {
    1772             :     PGresult   *res;
    1773             :     bool        expect_null;
    1774           2 :     int         num_syncs = 0;
    1775             : 
    1776           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1777             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1778           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1779           0 :         pg_fatal("failed to create test table: %s",
    1780             :                  PQerrorMessage(conn));
    1781           2 :     PQclear(res);
    1782             : 
    1783           2 :     if (PQenterPipelineMode(conn) != 1)
    1784           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1785             :                  PQerrorMessage(conn));
    1786           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1787           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1788             :                  PQerrorMessage(conn));
    1789             : 
    1790           2 :     if (PQsendQueryParams(conn,
    1791             :                           "BEGIN",
    1792             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1793           0 :         pg_fatal("failed to send query: %s",
    1794             :                  PQerrorMessage(conn));
    1795           2 :     if (PQsendQueryParams(conn,
    1796             :                           "SELECT 0/0",
    1797             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1798           0 :         pg_fatal("failed to send query: %s",
    1799             :                  PQerrorMessage(conn));
    1800             : 
    1801             :     /*
    1802             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1803             :      * get out of the pipeline-aborted state first.
    1804             :      */
    1805           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1806           0 :         pg_fatal("failed to execute prepared: %s",
    1807             :                  PQerrorMessage(conn));
    1808             : 
    1809             :     /* This insert fails because we're in pipeline-aborted state */
    1810           2 :     if (PQsendQueryParams(conn,
    1811             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1812             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1813           0 :         pg_fatal("failed to send query: %s",
    1814             :                  PQerrorMessage(conn));
    1815           2 :     if (PQpipelineSync(conn) != 1)
    1816           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1817           2 :     num_syncs++;
    1818             : 
    1819             :     /*
    1820             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1821             :      * an aborted transaction
    1822             :      */
    1823           2 :     if (PQsendQueryParams(conn,
    1824             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1825             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1826           0 :         pg_fatal("failed to send query: %s",
    1827             :                  PQerrorMessage(conn));
    1828           2 :     if (PQpipelineSync(conn) != 1)
    1829           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1830           2 :     num_syncs++;
    1831             : 
    1832             :     /*
    1833             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1834             :      * PQpipelineSync above.
    1835             :      */
    1836           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1837           0 :         pg_fatal("failed to execute prepared: %s",
    1838             :                  PQerrorMessage(conn));
    1839             : 
    1840             :     /*
    1841             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1842             :      * insert works
    1843             :      */
    1844           2 :     if (PQsendQueryParams(conn,
    1845             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1846             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1847           0 :         pg_fatal("failed to send query: %s",
    1848             :                  PQerrorMessage(conn));
    1849             :     /* Send two syncs now -- match up to SYNC messages below */
    1850           2 :     if (PQpipelineSync(conn) != 1)
    1851           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1852           2 :     num_syncs++;
    1853           2 :     if (PQpipelineSync(conn) != 1)
    1854           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1855           2 :     num_syncs++;
    1856             : 
    1857           2 :     expect_null = false;
    1858           2 :     for (int i = 0;; i++)
    1859          38 :     {
    1860             :         ExecStatusType restype;
    1861             : 
    1862          40 :         res = PQgetResult(conn);
    1863          40 :         if (res == NULL)
    1864             :         {
    1865          16 :             printf("%d: got NULL result\n", i);
    1866          16 :             if (!expect_null)
    1867           0 :                 pg_fatal("did not expect NULL here");
    1868          16 :             expect_null = false;
    1869          16 :             continue;
    1870             :         }
    1871          24 :         restype = PQresultStatus(res);
    1872          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1873          24 :         if (expect_null)
    1874           0 :             pg_fatal("expected NULL");
    1875          24 :         if (restype == PGRES_FATAL_ERROR)
    1876           4 :             printf("; error: %s", PQerrorMessage(conn));
    1877          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1878             :         {
    1879           4 :             printf(": command didn't run because pipeline aborted\n");
    1880             :         }
    1881             :         else
    1882          16 :             printf("\n");
    1883          24 :         PQclear(res);
    1884             : 
    1885          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1886           8 :             num_syncs--;
    1887             :         else
    1888          16 :             expect_null = true;
    1889          24 :         if (num_syncs <= 0)
    1890           2 :             break;
    1891             :     }
    1892           2 :     if (PQgetResult(conn) != NULL)
    1893           0 :         pg_fatal("returned something extra after all the syncs: %s",
    1894             :                  PQresStatus(PQresultStatus(res)));
    1895             : 
    1896           2 :     if (PQexitPipelineMode(conn) != 1)
    1897           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1898             : 
    1899             :     /* We expect to find one tuple containing the value "3" */
    1900           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1901           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1902           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1903           2 :     if (PQntuples(res) != 1)
    1904           0 :         pg_fatal("did not get 1 tuple");
    1905           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1906           0 :         pg_fatal("did not get expected tuple");
    1907           2 :     PQclear(res);
    1908             : 
    1909           2 :     fprintf(stderr, "ok\n");
    1910           2 : }
    1911             : 
    1912             : /*
    1913             :  * In this test mode we send a stream of queries, with one in the middle
    1914             :  * causing an error.  Verify that we can still send some more after the
    1915             :  * error and have libpq work properly.
    1916             :  */
    1917             : static void
    1918           2 : test_uniqviol(PGconn *conn)
    1919             : {
    1920           2 :     int         sock = PQsocket(conn);
    1921             :     PGresult   *res;
    1922           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    1923             :     const char *paramValues[2];
    1924             :     char        paramValue0[MAXINT8LEN];
    1925             :     char        paramValue1[MAXINT8LEN];
    1926           2 :     int         ctr = 0;
    1927           2 :     int         numsent = 0;
    1928           2 :     int         results = 0;
    1929           2 :     bool        read_done = false;
    1930           2 :     bool        write_done = false;
    1931           2 :     bool        error_sent = false;
    1932           2 :     bool        got_error = false;
    1933           2 :     int         switched = 0;
    1934           2 :     int         socketful = 0;
    1935             :     fd_set      in_fds;
    1936             :     fd_set      out_fds;
    1937             : 
    1938           2 :     fprintf(stderr, "uniqviol ...");
    1939             : 
    1940           2 :     PQsetnonblocking(conn, 1);
    1941             : 
    1942           2 :     paramValues[0] = paramValue0;
    1943           2 :     paramValues[1] = paramValue1;
    1944           2 :     sprintf(paramValue1, "42");
    1945             : 
    1946           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1947             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1948           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1949           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1950             : 
    1951           2 :     res = PQexec(conn, "begin");
    1952           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1953           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1954             : 
    1955           2 :     res = PQprepare(conn, "insertion",
    1956             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    1957             :                     2, paramTypes);
    1958           2 :     if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
    1959           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1960             : 
    1961           2 :     if (PQenterPipelineMode(conn) != 1)
    1962           0 :         pg_fatal("failed to enter pipeline mode");
    1963             : 
    1964          16 :     while (!read_done)
    1965             :     {
    1966             :         /*
    1967             :          * Avoid deadlocks by reading everything the server has sent before
    1968             :          * sending anything.  (Special precaution is needed here to process
    1969             :          * PQisBusy before testing the socket for read-readiness, because the
    1970             :          * socket does not turn read-ready after "sending" queries in aborted
    1971             :          * pipeline mode.)
    1972             :          */
    1973        1212 :         while (PQisBusy(conn) == 0)
    1974             :         {
    1975             :             bool        new_error;
    1976             : 
    1977        1202 :             if (results >= numsent)
    1978             :             {
    1979           2 :                 if (write_done)
    1980           0 :                     read_done = true;
    1981           2 :                 break;
    1982             :             }
    1983             : 
    1984        1200 :             res = PQgetResult(conn);
    1985        1200 :             new_error = process_result(conn, res, results, numsent);
    1986        1200 :             if (new_error && got_error)
    1987           0 :                 pg_fatal("got two errors");
    1988        1200 :             got_error |= new_error;
    1989        1200 :             if (results++ >= numsent - 1)
    1990             :             {
    1991           4 :                 if (write_done)
    1992           2 :                     read_done = true;
    1993           4 :                 break;
    1994             :             }
    1995             :         }
    1996             : 
    1997          16 :         if (read_done)
    1998           2 :             break;
    1999             : 
    2000          14 :         FD_ZERO(&out_fds);
    2001          14 :         FD_SET(sock, &out_fds);
    2002             : 
    2003          14 :         FD_ZERO(&in_fds);
    2004          14 :         FD_SET(sock, &in_fds);
    2005             : 
    2006          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    2007             :         {
    2008           0 :             if (errno == EINTR)
    2009           0 :                 continue;
    2010           0 :             pg_fatal("select() failed: %m");
    2011             :         }
    2012             : 
    2013          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    2014           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    2015             : 
    2016             :         /*
    2017             :          * If the socket is writable and we haven't finished sending queries,
    2018             :          * send some.
    2019             :          */
    2020          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    2021             :         {
    2022             :             for (;;)
    2023        1194 :             {
    2024             :                 int         flush;
    2025             : 
    2026             :                 /*
    2027             :                  * provoke uniqueness violation exactly once after having
    2028             :                  * switched to read mode.
    2029             :                  */
    2030        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    2031             :                 {
    2032           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    2033           2 :                     fprintf(stderr, "E");
    2034           2 :                     error_sent = true;
    2035             :                 }
    2036             :                 else
    2037             :                 {
    2038        1198 :                     fprintf(stderr, ".");
    2039        1198 :                     sprintf(paramValue0, "%d", ctr++);
    2040             :                 }
    2041             : 
    2042        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    2043           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    2044        1200 :                 numsent++;
    2045             : 
    2046             :                 /* Are we done writing? */
    2047        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    2048             :                 {
    2049           2 :                     if (PQsendFlushRequest(conn) != 1)
    2050           0 :                         pg_fatal("failed to send flush request");
    2051           2 :                     write_done = true;
    2052           2 :                     fprintf(stderr, "\ndone writing\n");
    2053           2 :                     PQflush(conn);
    2054           2 :                     break;
    2055             :                 }
    2056             : 
    2057             :                 /* is the outgoing socket full? */
    2058        1198 :                 flush = PQflush(conn);
    2059        1198 :                 if (flush == -1)
    2060           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    2061        1198 :                 if (flush == 1)
    2062             :                 {
    2063           4 :                     if (socketful == 0)
    2064           2 :                         socketful = numsent;
    2065           4 :                     fprintf(stderr, "\nswitch to reading\n");
    2066           4 :                     switched++;
    2067           4 :                     break;
    2068             :                 }
    2069             :             }
    2070             :         }
    2071             :     }
    2072             : 
    2073           2 :     if (!got_error)
    2074           0 :         pg_fatal("did not get expected error");
    2075             : 
    2076           2 :     fprintf(stderr, "ok\n");
    2077           2 : }
    2078             : 
    2079             : /*
    2080             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    2081             :  * the expected NULL that should follow it.
    2082             :  *
    2083             :  * Returns true if we read a fatal error message, otherwise false.
    2084             :  */
    2085             : static bool
    2086        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    2087             : {
    2088             :     PGresult   *res2;
    2089        1200 :     bool        got_error = false;
    2090             : 
    2091        1200 :     if (res == NULL)
    2092           0 :         pg_fatal("got unexpected NULL");
    2093             : 
    2094        1200 :     switch (PQresultStatus(res))
    2095             :     {
    2096           2 :         case PGRES_FATAL_ERROR:
    2097           2 :             got_error = true;
    2098           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    2099           2 :             PQclear(res);
    2100             : 
    2101           2 :             res2 = PQgetResult(conn);
    2102           2 :             if (res2 != NULL)
    2103           0 :                 pg_fatal("expected NULL, got %s",
    2104             :                          PQresStatus(PQresultStatus(res2)));
    2105           2 :             break;
    2106             : 
    2107         836 :         case PGRES_TUPLES_OK:
    2108         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    2109         836 :             PQclear(res);
    2110             : 
    2111         836 :             res2 = PQgetResult(conn);
    2112         836 :             if (res2 != NULL)
    2113           0 :                 pg_fatal("expected NULL, got %s",
    2114             :                          PQresStatus(PQresultStatus(res2)));
    2115         836 :             break;
    2116             : 
    2117         362 :         case PGRES_PIPELINE_ABORTED:
    2118         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    2119         362 :             res2 = PQgetResult(conn);
    2120         362 :             if (res2 != NULL)
    2121           0 :                 pg_fatal("expected NULL, got %s",
    2122             :                          PQresStatus(PQresultStatus(res2)));
    2123         362 :             break;
    2124             : 
    2125           0 :         default:
    2126           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    2127             :     }
    2128             : 
    2129        1200 :     return got_error;
    2130             : }
    2131             : 
    2132             : 
    2133             : static void
    2134           0 : usage(const char *progname)
    2135             : {
    2136           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    2137           0 :     fprintf(stderr, "Usage:\n");
    2138           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    2139           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    2140           0 :     fprintf(stderr, "\nOptions:\n");
    2141           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    2142           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    2143           0 : }
    2144             : 
    2145             : static void
    2146           2 : print_test_list(void)
    2147             : {
    2148           2 :     printf("cancel\n");
    2149           2 :     printf("disallowed_in_pipeline\n");
    2150           2 :     printf("multi_pipelines\n");
    2151           2 :     printf("nosync\n");
    2152           2 :     printf("pipeline_abort\n");
    2153           2 :     printf("pipeline_idle\n");
    2154           2 :     printf("pipelined_insert\n");
    2155           2 :     printf("prepared\n");
    2156           2 :     printf("simple_pipeline\n");
    2157           2 :     printf("singlerow\n");
    2158           2 :     printf("transaction\n");
    2159           2 :     printf("uniqviol\n");
    2160           2 : }
    2161             : 
    2162             : int
    2163          26 : main(int argc, char **argv)
    2164             : {
    2165          26 :     const char *conninfo = "";
    2166             :     PGconn     *conn;
    2167             :     FILE       *trace;
    2168             :     char       *testname;
    2169          26 :     int         numrows = 10000;
    2170             :     PGresult   *res;
    2171             :     int         c;
    2172             : 
    2173          94 :     while ((c = getopt(argc, argv, "r:t:")) != -1)
    2174             :     {
    2175          42 :         switch (c)
    2176             :         {
    2177          24 :             case 'r':           /* numrows */
    2178          24 :                 errno = 0;
    2179          24 :                 numrows = strtol(optarg, NULL, 10);
    2180          24 :                 if (errno != 0 || numrows <= 0)
    2181             :                 {
    2182           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    2183             :                             optarg);
    2184           0 :                     exit(1);
    2185             :                 }
    2186          24 :                 break;
    2187          18 :             case 't':           /* trace file */
    2188          18 :                 tracefile = pg_strdup(optarg);
    2189          18 :                 break;
    2190             :         }
    2191          68 :     }
    2192             : 
    2193          26 :     if (optind < argc)
    2194             :     {
    2195          26 :         testname = pg_strdup(argv[optind]);
    2196          26 :         optind++;
    2197             :     }
    2198             :     else
    2199             :     {
    2200           0 :         usage(argv[0]);
    2201           0 :         exit(1);
    2202             :     }
    2203             : 
    2204          26 :     if (strcmp(testname, "tests") == 0)
    2205             :     {
    2206           2 :         print_test_list();
    2207           2 :         exit(0);
    2208             :     }
    2209             : 
    2210          24 :     if (optind < argc)
    2211             :     {
    2212          24 :         conninfo = pg_strdup(argv[optind]);
    2213          24 :         optind++;
    2214             :     }
    2215             : 
    2216             :     /* Make a connection to the database */
    2217          24 :     conn = PQconnectdb(conninfo);
    2218          24 :     if (PQstatus(conn) != CONNECTION_OK)
    2219             :     {
    2220           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    2221             :                 PQerrorMessage(conn));
    2222           0 :         exit_nicely(conn);
    2223             :     }
    2224             : 
    2225          24 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    2226          24 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2227           0 :         pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
    2228          24 :     res = PQexec(conn, "SET debug_parallel_query = off");
    2229          24 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2230           0 :         pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
    2231             : 
    2232             :     /* Set the trace file, if requested */
    2233          24 :     if (tracefile != NULL)
    2234             :     {
    2235          18 :         if (strcmp(tracefile, "-") == 0)
    2236           0 :             trace = stdout;
    2237             :         else
    2238          18 :             trace = fopen(tracefile, "w");
    2239          18 :         if (trace == NULL)
    2240           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    2241             : 
    2242             :         /* Make it line-buffered */
    2243          18 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    2244             : 
    2245          18 :         PQtrace(conn, trace);
    2246          18 :         PQsetTraceFlags(conn,
    2247             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    2248             :     }
    2249             : 
    2250          24 :     if (strcmp(testname, "cancel") == 0)
    2251           2 :         test_cancel(conn);
    2252          22 :     else if (strcmp(testname, "disallowed_in_pipeline") == 0)
    2253           2 :         test_disallowed_in_pipeline(conn);
    2254          20 :     else if (strcmp(testname, "multi_pipelines") == 0)
    2255           2 :         test_multi_pipelines(conn);
    2256          18 :     else if (strcmp(testname, "nosync") == 0)
    2257           2 :         test_nosync(conn);
    2258          16 :     else if (strcmp(testname, "pipeline_abort") == 0)
    2259           2 :         test_pipeline_abort(conn);
    2260          14 :     else if (strcmp(testname, "pipeline_idle") == 0)
    2261           2 :         test_pipeline_idle(conn);
    2262          12 :     else if (strcmp(testname, "pipelined_insert") == 0)
    2263           2 :         test_pipelined_insert(conn, numrows);
    2264          10 :     else if (strcmp(testname, "prepared") == 0)
    2265           2 :         test_prepared(conn);
    2266           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    2267           2 :         test_simple_pipeline(conn);
    2268           6 :     else if (strcmp(testname, "singlerow") == 0)
    2269           2 :         test_singlerowmode(conn);
    2270           4 :     else if (strcmp(testname, "transaction") == 0)
    2271           2 :         test_transaction(conn);
    2272           2 :     else if (strcmp(testname, "uniqviol") == 0)
    2273           2 :         test_uniqviol(conn);
    2274             :     else
    2275             :     {
    2276           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    2277           0 :         exit(1);
    2278             :     }
    2279             : 
    2280             :     /* close the connection to the database and cleanup */
    2281          24 :     PQfinish(conn);
    2282          24 :     return 0;
    2283             : }

Generated by: LCOV version 1.14