LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 854 1118 76.4 %
Date: 2026-02-02 16:19:03 Functions: 24 27 88.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/select.h>
      19             : #include <sys/time.h>
      20             : 
      21             : #include "catalog/pg_type_d.h"
      22             : #include "libpq-fe.h"
      23             : #include "pg_getopt.h"
      24             : 
      25             : 
      26             : static void exit_nicely(PGconn *conn);
      27             : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
      28             :             pg_attribute_printf(2, 3);
      29             : static bool process_result(PGconn *conn, PGresult *res, int results,
      30             :                            int numsent);
      31             : 
      32             : static const char *const progname = "libpq_pipeline";
      33             : 
      34             : /* Options and defaults */
      35             : static char *tracefile = NULL;  /* path to PQtrace() file */
      36             : 
      37             : 
      38             : #ifdef DEBUG_OUTPUT
      39             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      40             : #else
      41             : #define pg_debug(...)
      42             : #endif
      43             : 
      44             : static const char *const drop_table_sql =
      45             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      46             : static const char *const create_table_sql =
      47             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      48             : "int8filler int8);";
      49             : static const char *const insert_sql =
      50             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      51             : static const char *const insert_sql2 =
      52             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      53             : 
      54             : /* max char length of an int32/64, plus sign and null terminator */
      55             : #define MAXINTLEN 12
      56             : #define MAXINT8LEN 20
      57             : 
      58             : static void
      59           0 : exit_nicely(PGconn *conn)
      60             : {
      61           0 :     PQfinish(conn);
      62           0 :     exit(1);
      63             : }
      64             : 
      65             : /*
      66             :  * The following few functions are wrapped in macros to make the reported line
      67             :  * number in an error match the line number of the invocation.
      68             :  */
      69             : 
      70             : /*
      71             :  * Print an error to stderr and terminate the program.
      72             :  */
      73             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      74             : pg_noreturn static void
      75           0 : pg_fatal_impl(int line, const char *fmt,...)
      76             : {
      77             :     va_list     args;
      78             : 
      79           0 :     fflush(stdout);
      80             : 
      81           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      82           0 :     va_start(args, fmt);
      83           0 :     vfprintf(stderr, fmt, args);
      84           0 :     va_end(args);
      85             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      86           0 :     fprintf(stderr, "\n");
      87           0 :     exit(1);
      88             : }
      89             : 
      90             : /*
      91             :  * Check that libpq next returns a PGresult with the specified status,
      92             :  * returning the PGresult so that caller can perform additional checks.
      93             :  */
      94             : #define confirm_result_status(conn, status) confirm_result_status_impl(__LINE__, conn, status)
      95             : static PGresult *
      96         112 : confirm_result_status_impl(int line, PGconn *conn, ExecStatusType status)
      97             : {
      98             :     PGresult   *res;
      99             : 
     100         112 :     res = PQgetResult(conn);
     101         112 :     if (res == NULL)
     102           0 :         pg_fatal_impl(line, "PQgetResult returned null unexpectedly: %s",
     103             :                       PQerrorMessage(conn));
     104         112 :     if (PQresultStatus(res) != status)
     105           0 :         pg_fatal_impl(line, "PQgetResult returned status %s, expected %s: %s",
     106             :                       PQresStatus(PQresultStatus(res)),
     107             :                       PQresStatus(status),
     108             :                       PQerrorMessage(conn));
     109         112 :     return res;
     110             : }
     111             : 
     112             : /*
     113             :  * Check that libpq next returns a PGresult with the specified status,
     114             :  * then free the PGresult.
     115             :  */
     116             : #define consume_result_status(conn, status) consume_result_status_impl(__LINE__, conn, status)
     117             : static void
     118          78 : consume_result_status_impl(int line, PGconn *conn, ExecStatusType status)
     119             : {
     120             :     PGresult   *res;
     121             : 
     122          78 :     res = confirm_result_status_impl(line, conn, status);
     123          78 :     PQclear(res);
     124          78 : }
     125             : 
     126             : /*
     127             :  * Check that libpq next returns a null PGresult.
     128             :  */
     129             : #define consume_null_result(conn) consume_null_result_impl(__LINE__, conn)
     130             : static void
     131        1262 : consume_null_result_impl(int line, PGconn *conn)
     132             : {
     133             :     PGresult   *res;
     134             : 
     135        1262 :     res = PQgetResult(conn);
     136        1262 :     if (res != NULL)
     137           0 :         pg_fatal_impl(line, "expected NULL PGresult, got %s: %s",
     138             :                       PQresStatus(PQresultStatus(res)),
     139             :                       PQerrorMessage(conn));
     140        1262 : }
     141             : 
     142             : /*
     143             :  * Check that the query on the given connection got canceled.
     144             :  */
     145             : #define consume_query_cancel(conn) consume_query_cancel_impl(__LINE__, conn)
     146             : static void
     147          24 : consume_query_cancel_impl(int line, PGconn *conn)
     148             : {
     149             :     PGresult   *res;
     150             : 
     151          24 :     res = confirm_result_status_impl(line, conn, PGRES_FATAL_ERROR);
     152          24 :     if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
     153           0 :         pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
     154             :                       PQerrorMessage(conn));
     155          24 :     PQclear(res);
     156             : 
     157          24 :     while (PQisBusy(conn))
     158           0 :         PQconsumeInput(conn);
     159          24 : }
     160             : 
     161             : /*
     162             :  * Using monitorConn, query pg_stat_activity to see that the connection with
     163             :  * the given PID is either in the given state, or waiting on the given event
     164             :  * (only one of them can be given).
     165             :  */
     166             : static void
     167          48 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
     168             :                           char *state, char *event)
     169             : {
     170          48 :     const Oid   paramTypes[] = {INT4OID, TEXTOID};
     171             :     const char *paramValues[2];
     172          48 :     char       *pidstr = psprintf("%d", procpid);
     173             : 
     174             :     Assert((state == NULL) ^ (event == NULL));
     175             : 
     176          48 :     paramValues[0] = pidstr;
     177          48 :     paramValues[1] = state ? state : event;
     178             : 
     179             :     while (true)
     180           0 :     {
     181             :         PGresult   *res;
     182             :         char       *value;
     183             : 
     184          48 :         if (state != NULL)
     185          24 :             res = PQexecParams(monitorConn,
     186             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     187             :                                "pid = $1 AND state = $2",
     188             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     189             :         else
     190          24 :             res = PQexecParams(monitorConn,
     191             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     192             :                                "pid = $1 AND wait_event = $2",
     193             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     194             : 
     195          48 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     196           0 :             pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
     197          48 :         if (PQntuples(res) != 1)
     198           0 :             pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
     199          48 :         if (PQnfields(res) != 1)
     200           0 :             pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
     201          48 :         value = PQgetvalue(res, 0, 0);
     202          48 :         if (strcmp(value, "0") != 0)
     203             :         {
     204          48 :             PQclear(res);
     205          48 :             break;
     206             :         }
     207           0 :         PQclear(res);
     208             : 
     209             :         /* wait 10ms before polling again */
     210           0 :         pg_usleep(10000);
     211             :     }
     212             : 
     213          48 :     pfree(pidstr);
     214          48 : }
     215             : 
     216             : #define send_cancellable_query(conn, monitorConn) \
     217             :     send_cancellable_query_impl(__LINE__, conn, monitorConn)
     218             : static void
     219          24 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
     220             : {
     221             :     const char *env_wait;
     222          24 :     const Oid   paramTypes[1] = {INT4OID};
     223             : 
     224             :     /*
     225             :      * Wait for the connection to be idle, so that our check for an active
     226             :      * connection below is reliable, instead of possibly seeing an outdated
     227             :      * state.
     228             :      */
     229          24 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
     230             : 
     231          24 :     env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
     232          24 :     if (env_wait == NULL)
     233          24 :         env_wait = "180";
     234             : 
     235          24 :     if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
     236             :                           &env_wait, NULL, NULL, 0) != 1)
     237           0 :         pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
     238             : 
     239             :     /*
     240             :      * Wait for the sleep to be active, because if the query is not running
     241             :      * yet, the cancel request that we send won't have any effect.
     242             :      */
     243          24 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
     244          24 : }
     245             : 
     246             : /*
     247             :  * Create a new connection with the same conninfo as the given one.
     248             :  */
     249             : static PGconn *
     250           4 : copy_connection(PGconn *conn)
     251             : {
     252             :     PGconn     *copyConn;
     253           4 :     PQconninfoOption *opts = PQconninfo(conn);
     254             :     const char **keywords;
     255             :     const char **vals;
     256           4 :     int         nopts = 0;
     257             :     int         i;
     258             : 
     259         208 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     260         204 :         nopts++;
     261           4 :     nopts++;                    /* for the NULL terminator */
     262             : 
     263           4 :     keywords = pg_malloc(sizeof(char *) * nopts);
     264           4 :     vals = pg_malloc(sizeof(char *) * nopts);
     265             : 
     266           4 :     i = 0;
     267         208 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     268             :     {
     269         204 :         if (opt->val)
     270             :         {
     271          80 :             keywords[i] = opt->keyword;
     272          80 :             vals[i] = opt->val;
     273          80 :             i++;
     274             :         }
     275             :     }
     276           4 :     keywords[i] = vals[i] = NULL;
     277             : 
     278           4 :     copyConn = PQconnectdbParams(keywords, vals, false);
     279             : 
     280           4 :     if (PQstatus(copyConn) != CONNECTION_OK)
     281           0 :         pg_fatal("Connection to database failed: %s",
     282             :                  PQerrorMessage(copyConn));
     283             : 
     284           4 :     pfree(keywords);
     285           4 :     pfree(vals);
     286           4 :     PQconninfoFree(opts);
     287             : 
     288           4 :     return copyConn;
     289             : }
     290             : 
     291             : /*
     292             :  * Test query cancellation routines
     293             :  */
     294             : static void
     295           4 : test_cancel(PGconn *conn)
     296             : {
     297             :     PGcancel   *cancel;
     298             :     PGcancelConn *cancelConn;
     299             :     PGconn     *monitorConn;
     300             :     char        errorbuf[256];
     301             : 
     302           4 :     fprintf(stderr, "test cancellations... ");
     303             : 
     304           4 :     if (PQsetnonblocking(conn, 1) != 0)
     305           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     306             : 
     307             :     /*
     308             :      * Make a separate connection to the database to monitor the query on the
     309             :      * main connection.
     310             :      */
     311           4 :     monitorConn = copy_connection(conn);
     312             :     Assert(PQstatus(monitorConn) == CONNECTION_OK);
     313             : 
     314             :     /* test PQcancel */
     315           4 :     send_cancellable_query(conn, monitorConn);
     316           4 :     cancel = PQgetCancel(conn);
     317           4 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     318           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     319           4 :     consume_query_cancel(conn);
     320             : 
     321             :     /* PGcancel object can be reused for the next query */
     322           4 :     send_cancellable_query(conn, monitorConn);
     323           4 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     324           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     325           4 :     consume_query_cancel(conn);
     326             : 
     327           4 :     PQfreeCancel(cancel);
     328             : 
     329             :     /* test PQrequestCancel */
     330           4 :     send_cancellable_query(conn, monitorConn);
     331           4 :     if (!PQrequestCancel(conn))
     332           0 :         pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
     333           4 :     consume_query_cancel(conn);
     334             : 
     335             :     /* test PQcancelBlocking */
     336           4 :     send_cancellable_query(conn, monitorConn);
     337           4 :     cancelConn = PQcancelCreate(conn);
     338           4 :     if (!PQcancelBlocking(cancelConn))
     339           0 :         pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
     340           4 :     consume_query_cancel(conn);
     341           4 :     PQcancelFinish(cancelConn);
     342             : 
     343             :     /* test PQcancelCreate and then polling with PQcancelPoll */
     344           4 :     send_cancellable_query(conn, monitorConn);
     345           4 :     cancelConn = PQcancelCreate(conn);
     346           4 :     if (!PQcancelStart(cancelConn))
     347           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     348             :     while (true)
     349           4 :     {
     350             :         struct timeval tv;
     351             :         fd_set      input_mask;
     352             :         fd_set      output_mask;
     353           8 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     354           8 :         int         sock = PQcancelSocket(cancelConn);
     355             : 
     356           8 :         if (pollres == PGRES_POLLING_OK)
     357           4 :             break;
     358             : 
     359          68 :         FD_ZERO(&input_mask);
     360          68 :         FD_ZERO(&output_mask);
     361           4 :         switch (pollres)
     362             :         {
     363           4 :             case PGRES_POLLING_READING:
     364             :                 pg_debug("polling for reads\n");
     365           4 :                 FD_SET(sock, &input_mask);
     366           4 :                 break;
     367           0 :             case PGRES_POLLING_WRITING:
     368             :                 pg_debug("polling for writes\n");
     369           0 :                 FD_SET(sock, &output_mask);
     370           0 :                 break;
     371           0 :             default:
     372           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     373             :         }
     374             : 
     375           4 :         if (sock < 0)
     376           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     377             : 
     378           4 :         tv.tv_sec = 3;
     379           4 :         tv.tv_usec = 0;
     380             : 
     381             :         while (true)
     382             :         {
     383           4 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     384             :             {
     385           0 :                 if (errno == EINTR)
     386           0 :                     continue;
     387           0 :                 pg_fatal("select() failed: %m");
     388             :             }
     389           4 :             break;
     390             :         }
     391             :     }
     392           4 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     393           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     394           4 :     consume_query_cancel(conn);
     395             : 
     396             :     /*
     397             :      * test PQcancelReset works on the cancel connection and it can be reused
     398             :      * afterwards
     399             :      */
     400           4 :     PQcancelReset(cancelConn);
     401             : 
     402           4 :     send_cancellable_query(conn, monitorConn);
     403           4 :     if (!PQcancelStart(cancelConn))
     404           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     405             :     while (true)
     406           4 :     {
     407             :         struct timeval tv;
     408             :         fd_set      input_mask;
     409             :         fd_set      output_mask;
     410           8 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     411           8 :         int         sock = PQcancelSocket(cancelConn);
     412             : 
     413           8 :         if (pollres == PGRES_POLLING_OK)
     414           4 :             break;
     415             : 
     416          68 :         FD_ZERO(&input_mask);
     417          68 :         FD_ZERO(&output_mask);
     418           4 :         switch (pollres)
     419             :         {
     420           4 :             case PGRES_POLLING_READING:
     421             :                 pg_debug("polling for reads\n");
     422           4 :                 FD_SET(sock, &input_mask);
     423           4 :                 break;
     424           0 :             case PGRES_POLLING_WRITING:
     425             :                 pg_debug("polling for writes\n");
     426           0 :                 FD_SET(sock, &output_mask);
     427           0 :                 break;
     428           0 :             default:
     429           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     430             :         }
     431             : 
     432           4 :         if (sock < 0)
     433           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     434             : 
     435           4 :         tv.tv_sec = 3;
     436           4 :         tv.tv_usec = 0;
     437             : 
     438             :         while (true)
     439             :         {
     440           4 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     441             :             {
     442           0 :                 if (errno == EINTR)
     443           0 :                     continue;
     444           0 :                 pg_fatal("select() failed: %m");
     445             :             }
     446           4 :             break;
     447             :         }
     448             :     }
     449           4 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     450           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     451           4 :     consume_query_cancel(conn);
     452             : 
     453           4 :     PQcancelFinish(cancelConn);
     454           4 :     PQfinish(monitorConn);
     455             : 
     456           4 :     fprintf(stderr, "ok\n");
     457           4 : }
     458             : 
     459             : static void
     460           2 : test_disallowed_in_pipeline(PGconn *conn)
     461             : {
     462           2 :     PGresult   *res = NULL;
     463             : 
     464           2 :     fprintf(stderr, "test error cases... ");
     465             : 
     466           2 :     if (PQisnonblocking(conn))
     467           0 :         pg_fatal("Expected blocking connection mode");
     468             : 
     469           2 :     if (PQenterPipelineMode(conn) != 1)
     470           0 :         pg_fatal("Unable to enter pipeline mode");
     471             : 
     472           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     473           0 :         pg_fatal("Pipeline mode not activated properly");
     474             : 
     475             :     /* PQexec should fail in pipeline mode */
     476           2 :     res = PQexec(conn, "SELECT 1");
     477           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     478           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     479           2 :     if (strcmp(PQerrorMessage(conn),
     480             :                "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     481           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     482             :                  PQerrorMessage(conn));
     483           2 :     PQclear(res);
     484             : 
     485             :     /* PQsendQuery should fail in pipeline mode */
     486           2 :     if (PQsendQuery(conn, "SELECT 1") != 0)
     487           0 :         pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     488           2 :     if (strcmp(PQerrorMessage(conn),
     489             :                "PQsendQuery not allowed in pipeline mode\n") != 0)
     490           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     491             :                  PQerrorMessage(conn));
     492             : 
     493             :     /* Entering pipeline mode when already in pipeline mode is OK */
     494           2 :     if (PQenterPipelineMode(conn) != 1)
     495           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     496             : 
     497           2 :     if (PQisBusy(conn) != 0)
     498           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     499             : 
     500             :     /* ok, back to normal command mode */
     501           2 :     if (PQexitPipelineMode(conn) != 1)
     502           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     503             : 
     504           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     505           0 :         pg_fatal("Pipeline mode not terminated properly");
     506             : 
     507             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     508           2 :     if (PQexitPipelineMode(conn) != 1)
     509           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     510             : 
     511             :     /* can now PQexec again */
     512           2 :     res = PQexec(conn, "SELECT 1");
     513           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     514           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     515             :                  PQerrorMessage(conn));
     516           2 :     PQclear(res);
     517             : 
     518           2 :     fprintf(stderr, "ok\n");
     519           2 : }
     520             : 
     521             : static void
     522           2 : test_multi_pipelines(PGconn *conn)
     523             : {
     524           2 :     const char *dummy_params[1] = {"1"};
     525           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     526             : 
     527           2 :     fprintf(stderr, "multi pipeline... ");
     528             : 
     529             :     /*
     530             :      * Queue up a couple of small pipelines and process each without returning
     531             :      * to command mode first.
     532             :      */
     533           2 :     if (PQenterPipelineMode(conn) != 1)
     534           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     535             : 
     536             :     /* first pipeline */
     537           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     538             :                           dummy_params, NULL, NULL, 0) != 1)
     539           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     540             : 
     541           2 :     if (PQpipelineSync(conn) != 1)
     542           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     543             : 
     544             :     /* second pipeline */
     545           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     546             :                           dummy_params, NULL, NULL, 0) != 1)
     547           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     548             : 
     549             :     /* Skip flushing once. */
     550           2 :     if (PQsendPipelineSync(conn) != 1)
     551           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     552             : 
     553             :     /* third pipeline */
     554           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     555             :                           dummy_params, NULL, NULL, 0) != 1)
     556           0 :         pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     557             : 
     558           2 :     if (PQpipelineSync(conn) != 1)
     559           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     560             : 
     561             :     /* OK, start processing the results */
     562             : 
     563             :     /* first pipeline */
     564           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
     565             : 
     566           2 :     consume_null_result(conn);
     567             : 
     568           2 :     if (PQexitPipelineMode(conn) != 0)
     569           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     570             : 
     571           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     572             : 
     573             :     /* second pipeline */
     574           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
     575             : 
     576           2 :     consume_null_result(conn);
     577             : 
     578           2 :     if (PQexitPipelineMode(conn) != 0)
     579           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     580             : 
     581           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     582             : 
     583             :     /* third pipeline */
     584           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
     585             : 
     586           2 :     consume_null_result(conn);
     587             : 
     588           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     589             : 
     590             :     /* We're still in pipeline mode ... */
     591           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     592           0 :         pg_fatal("Fell out of pipeline mode somehow");
     593             : 
     594             :     /* until we end it, which we can safely do now */
     595           2 :     if (PQexitPipelineMode(conn) != 1)
     596           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     597             :                  PQerrorMessage(conn));
     598             : 
     599           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     600           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     601             : 
     602           2 :     fprintf(stderr, "ok\n");
     603           2 : }
     604             : 
     605             : /*
     606             :  * Test behavior when a pipeline dispatches a number of commands that are
     607             :  * not flushed by a sync point.
     608             :  */
     609             : static void
     610           2 : test_nosync(PGconn *conn)
     611             : {
     612           2 :     int         numqueries = 10;
     613           2 :     int         results = 0;
     614           2 :     int         sock = PQsocket(conn);
     615             : 
     616           2 :     fprintf(stderr, "nosync... ");
     617             : 
     618           2 :     if (sock < 0)
     619           0 :         pg_fatal("invalid socket");
     620             : 
     621           2 :     if (PQenterPipelineMode(conn) != 1)
     622           0 :         pg_fatal("could not enter pipeline mode");
     623          22 :     for (int i = 0; i < numqueries; i++)
     624             :     {
     625             :         fd_set      input_mask;
     626             :         struct timeval tv;
     627             : 
     628          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     629             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     630           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     631          20 :         PQflush(conn);
     632             : 
     633             :         /*
     634             :          * If the server has written anything to us, read (some of) it now.
     635             :          */
     636         340 :         FD_ZERO(&input_mask);
     637          20 :         FD_SET(sock, &input_mask);
     638          20 :         tv.tv_sec = 0;
     639          20 :         tv.tv_usec = 0;
     640          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     641             :         {
     642           0 :             fprintf(stderr, "select() failed: %m\n");
     643           0 :             exit_nicely(conn);
     644             :         }
     645          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     646           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     647             :     }
     648             : 
     649             :     /* tell server to flush its output buffer */
     650           2 :     if (PQsendFlushRequest(conn) != 1)
     651           0 :         pg_fatal("failed to send flush request");
     652           2 :     PQflush(conn);
     653             : 
     654             :     /* Now read all results */
     655             :     for (;;)
     656             :     {
     657             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     658          20 :         consume_result_status(conn, PGRES_TUPLES_OK);
     659             : 
     660             :         /* and one NULL result should follow each */
     661          20 :         consume_null_result(conn);
     662             : 
     663          20 :         results++;
     664             : 
     665             :         /* if we're done, we're done */
     666          20 :         if (results == numqueries)
     667           2 :             break;
     668             :     }
     669             : 
     670           2 :     fprintf(stderr, "ok\n");
     671           2 : }
     672             : 
     673             : /*
     674             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     675             :  * still have to get results for each pipeline item, but the item will just be
     676             :  * a PGRES_PIPELINE_ABORTED code.
     677             :  *
     678             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     679             :  * usually use an xact, but in this case we want to observe the effects of each
     680             :  * statement.
     681             :  */
     682             : static void
     683           2 : test_pipeline_abort(PGconn *conn)
     684             : {
     685           2 :     PGresult   *res = NULL;
     686           2 :     const char *dummy_params[1] = {"1"};
     687           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     688             :     int         i;
     689             :     int         gotrows;
     690             :     bool        goterror;
     691             : 
     692           2 :     fprintf(stderr, "aborted pipeline... ");
     693             : 
     694           2 :     res = PQexec(conn, drop_table_sql);
     695           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     696           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     697           2 :     PQclear(res);
     698             : 
     699           2 :     res = PQexec(conn, create_table_sql);
     700           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     701           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     702           2 :     PQclear(res);
     703             : 
     704             :     /*
     705             :      * Queue up a couple of small pipelines and process each without returning
     706             :      * to command mode first. Make sure the second operation in the first
     707             :      * pipeline ERRORs.
     708             :      */
     709           2 :     if (PQenterPipelineMode(conn) != 1)
     710           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     711             : 
     712           2 :     dummy_params[0] = "1";
     713           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     714             :                           dummy_params, NULL, NULL, 0) != 1)
     715           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     716             : 
     717           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     718             :                           1, dummy_param_oids, dummy_params,
     719             :                           NULL, NULL, 0) != 1)
     720           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     721             : 
     722           2 :     dummy_params[0] = "2";
     723           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     724             :                           dummy_params, NULL, NULL, 0) != 1)
     725           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     726             : 
     727           2 :     if (PQpipelineSync(conn) != 1)
     728           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     729             : 
     730           2 :     dummy_params[0] = "3";
     731           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     732             :                           dummy_params, NULL, NULL, 0) != 1)
     733           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     734             :                  PQerrorMessage(conn));
     735             : 
     736           2 :     if (PQpipelineSync(conn) != 1)
     737           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     738             : 
     739             :     /*
     740             :      * OK, start processing the pipeline results.
     741             :      *
     742             :      * We should get a command-ok for the first query, then a fatal error and
     743             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     744             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     745             :      */
     746           2 :     consume_result_status(conn, PGRES_COMMAND_OK);
     747             : 
     748             :     /* NULL result to signal end-of-results for this command */
     749           2 :     consume_null_result(conn);
     750             : 
     751             :     /* Second query caused error, so we expect an error next */
     752           2 :     consume_result_status(conn, PGRES_FATAL_ERROR);
     753             : 
     754             :     /* NULL result to signal end-of-results for this command */
     755           2 :     consume_null_result(conn);
     756             : 
     757             :     /*
     758             :      * pipeline should now be aborted.
     759             :      *
     760             :      * Note that we could still queue more queries at this point if we wanted;
     761             :      * they'd get added to a new third pipeline since we've already sent a
     762             :      * second. The aborted flag relates only to the pipeline being received.
     763             :      */
     764           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     765           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     766             : 
     767             :     /* third query in pipeline, the second insert */
     768           2 :     consume_result_status(conn, PGRES_PIPELINE_ABORTED);
     769             : 
     770             :     /* NULL result to signal end-of-results for this command */
     771           2 :     consume_null_result(conn);
     772             : 
     773           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     774           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     775             : 
     776             :     /* Ensure we're still in pipeline */
     777           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     778           0 :         pg_fatal("Fell out of pipeline mode somehow");
     779             : 
     780             :     /*
     781             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     782             :      *
     783             :      * (This is so clients know to start processing results normally again and
     784             :      * can tell the difference between skipped commands and the sync.)
     785             :      */
     786           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     787             : 
     788           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     789           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     790             : 
     791             :     /* We're still in pipeline mode... */
     792           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     793           0 :         pg_fatal("Fell out of pipeline mode somehow");
     794             : 
     795             :     /* the insert from the second pipeline */
     796           2 :     consume_result_status(conn, PGRES_COMMAND_OK);
     797             : 
     798             :     /* Read the NULL result at the end of the command */
     799           2 :     consume_null_result(conn);
     800             : 
     801             :     /* the second pipeline sync */
     802           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     803             : 
     804             :     /* Read the NULL result at the end of the command */
     805           2 :     consume_null_result(conn);
     806             : 
     807             :     /* Try to send two queries in one command */
     808           2 :     if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     809           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     810           2 :     if (PQpipelineSync(conn) != 1)
     811           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     812           2 :     goterror = false;
     813           4 :     while ((res = PQgetResult(conn)) != NULL)
     814             :     {
     815           2 :         switch (PQresultStatus(res))
     816             :         {
     817           2 :             case PGRES_FATAL_ERROR:
     818           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     819           0 :                     pg_fatal("expected error about multiple commands, got %s",
     820             :                              PQerrorMessage(conn));
     821           2 :                 printf("got expected %s", PQerrorMessage(conn));
     822           2 :                 goterror = true;
     823           2 :                 break;
     824           0 :             default:
     825           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     826             :                 break;
     827             :         }
     828           2 :         PQclear(res);
     829             :     }
     830           2 :     if (!goterror)
     831           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     832             : 
     833             :     /* the second pipeline sync */
     834           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     835             : 
     836           2 :     fprintf(stderr, "ok\n");
     837             : 
     838             :     /* Test single-row mode with an error partways */
     839           2 :     if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     840             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
     841           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     842           2 :     if (PQpipelineSync(conn) != 1)
     843           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     844           2 :     PQsetSingleRowMode(conn);
     845           2 :     goterror = false;
     846           2 :     gotrows = 0;
     847          10 :     while ((res = PQgetResult(conn)) != NULL)
     848             :     {
     849           8 :         switch (PQresultStatus(res))
     850             :         {
     851           6 :             case PGRES_SINGLE_TUPLE:
     852           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     853           6 :                 gotrows++;
     854           6 :                 break;
     855           2 :             case PGRES_FATAL_ERROR:
     856           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     857           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     858             :                              PQerrorMessage(conn),
     859             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     860           2 :                 printf("got expected division-by-zero\n");
     861           2 :                 goterror = true;
     862           2 :                 break;
     863           0 :             default:
     864           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     865             :         }
     866           8 :         PQclear(res);
     867             :     }
     868           2 :     if (!goterror)
     869           0 :         pg_fatal("did not get division-by-zero error");
     870           2 :     if (gotrows != 3)
     871           0 :         pg_fatal("did not get three rows");
     872             : 
     873             :     /* the third pipeline sync */
     874           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
     875             : 
     876             :     /* We're still in pipeline mode... */
     877           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     878           0 :         pg_fatal("Fell out of pipeline mode somehow");
     879             : 
     880             :     /* until we end it, which we can safely do now */
     881           2 :     if (PQexitPipelineMode(conn) != 1)
     882           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     883             :                  PQerrorMessage(conn));
     884             : 
     885           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     886           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     887             : 
     888             :     /*-
     889             :      * Since we fired the pipelines off without a surrounding xact, the results
     890             :      * should be:
     891             :      *
     892             :      * - Implicit xact started by server around 1st pipeline
     893             :      * - First insert applied
     894             :      * - Second statement aborted xact
     895             :      * - Third insert skipped
     896             :      * - Sync rolled back first implicit xact
     897             :      * - Implicit xact created by server around 2nd pipeline
     898             :      * - insert applied from 2nd pipeline
     899             :      * - Sync commits 2nd xact
     900             :      *
     901             :      * So we should only have the value 3 that we inserted.
     902             :      */
     903           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     904             : 
     905           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     906           0 :         pg_fatal("Expected tuples, got %s: %s",
     907             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     908           2 :     if (PQntuples(res) != 1)
     909           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     910           4 :     for (i = 0; i < PQntuples(res); i++)
     911             :     {
     912           2 :         const char *val = PQgetvalue(res, i, 0);
     913             : 
     914           2 :         if (strcmp(val, "3") != 0)
     915           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     916             :     }
     917             : 
     918           2 :     PQclear(res);
     919             : 
     920           2 :     fprintf(stderr, "ok\n");
     921           2 : }
     922             : 
     923             : /* State machine enum for test_pipelined_insert */
     924             : enum PipelineInsertStep
     925             : {
     926             :     BI_BEGIN_TX,
     927             :     BI_DROP_TABLE,
     928             :     BI_CREATE_TABLE,
     929             :     BI_PREPARE,
     930             :     BI_INSERT_ROWS,
     931             :     BI_COMMIT_TX,
     932             :     BI_SYNC,
     933             :     BI_DONE,
     934             : };
     935             : 
     936             : static void
     937           2 : test_pipelined_insert(PGconn *conn, int n_rows)
     938             : {
     939           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
     940             :     const char *insert_params[2];
     941             :     char        insert_param_0[MAXINTLEN];
     942             :     char        insert_param_1[MAXINT8LEN];
     943           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
     944           2 :                 recv_step = BI_BEGIN_TX;
     945             :     int         rows_to_send,
     946             :                 rows_to_receive;
     947             : 
     948           2 :     insert_params[0] = insert_param_0;
     949           2 :     insert_params[1] = insert_param_1;
     950             : 
     951           2 :     rows_to_send = rows_to_receive = n_rows;
     952             : 
     953             :     /*
     954             :      * Do a pipelined insert into a table created at the start of the pipeline
     955             :      */
     956           2 :     if (PQenterPipelineMode(conn) != 1)
     957           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     958             : 
     959           8 :     while (send_step != BI_PREPARE)
     960             :     {
     961             :         const char *sql;
     962             : 
     963           6 :         switch (send_step)
     964             :         {
     965           2 :             case BI_BEGIN_TX:
     966           2 :                 sql = "BEGIN TRANSACTION";
     967           2 :                 send_step = BI_DROP_TABLE;
     968           2 :                 break;
     969             : 
     970           2 :             case BI_DROP_TABLE:
     971           2 :                 sql = drop_table_sql;
     972           2 :                 send_step = BI_CREATE_TABLE;
     973           2 :                 break;
     974             : 
     975           2 :             case BI_CREATE_TABLE:
     976           2 :                 sql = create_table_sql;
     977           2 :                 send_step = BI_PREPARE;
     978           2 :                 break;
     979             : 
     980           0 :             default:
     981           0 :                 pg_fatal("invalid state");
     982             :                 sql = NULL;     /* keep compiler quiet */
     983             :         }
     984             : 
     985             :         pg_debug("sending: %s\n", sql);
     986           6 :         if (PQsendQueryParams(conn, sql,
     987             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     988           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
     989             :     }
     990             : 
     991             :     Assert(send_step == BI_PREPARE);
     992             :     pg_debug("sending: %s\n", insert_sql2);
     993           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
     994           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
     995           2 :     send_step = BI_INSERT_ROWS;
     996             : 
     997             :     /*
     998             :      * Now we start inserting. We'll be sending enough data that we could fill
     999             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
    1000             :      * mode and consume input while we send more output. As results of each
    1001             :      * query are processed we should pop them to allow processing of the next
    1002             :      * query. There's no need to finish the pipeline before processing
    1003             :      * results.
    1004             :      */
    1005           2 :     if (PQsetnonblocking(conn, 1) != 0)
    1006           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
    1007             : 
    1008       39852 :     while (recv_step != BI_DONE)
    1009             :     {
    1010             :         int         sock;
    1011             :         fd_set      input_mask;
    1012             :         fd_set      output_mask;
    1013             : 
    1014       39850 :         sock = PQsocket(conn);
    1015             : 
    1016       39850 :         if (sock < 0)
    1017           0 :             break;              /* shouldn't happen */
    1018             : 
    1019      677450 :         FD_ZERO(&input_mask);
    1020       39850 :         FD_SET(sock, &input_mask);
    1021      677450 :         FD_ZERO(&output_mask);
    1022       39850 :         FD_SET(sock, &output_mask);
    1023             : 
    1024       39850 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
    1025             :         {
    1026           0 :             fprintf(stderr, "select() failed: %m\n");
    1027           0 :             exit_nicely(conn);
    1028             :         }
    1029             : 
    1030             :         /*
    1031             :          * Process any results, so we keep the server's output buffer free
    1032             :          * flowing and it can continue to process input
    1033             :          */
    1034       39850 :         if (FD_ISSET(sock, &input_mask))
    1035             :         {
    1036           4 :             PQconsumeInput(conn);
    1037             : 
    1038             :             /* Read until we'd block if we tried to read */
    1039        2826 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
    1040             :             {
    1041             :                 PGresult   *res;
    1042        2822 :                 const char *cmdtag = "";
    1043        2822 :                 const char *description = "";
    1044             :                 int         status;
    1045             : 
    1046             :                 /*
    1047             :                  * Read next result.  If no more results from this query,
    1048             :                  * advance to the next query
    1049             :                  */
    1050        2822 :                 res = PQgetResult(conn);
    1051        2822 :                 if (res == NULL)
    1052        1410 :                     continue;
    1053             : 
    1054        1412 :                 status = PGRES_COMMAND_OK;
    1055        1412 :                 switch (recv_step)
    1056             :                 {
    1057           2 :                     case BI_BEGIN_TX:
    1058           2 :                         cmdtag = "BEGIN";
    1059           2 :                         recv_step++;
    1060           2 :                         break;
    1061           2 :                     case BI_DROP_TABLE:
    1062           2 :                         cmdtag = "DROP TABLE";
    1063           2 :                         recv_step++;
    1064           2 :                         break;
    1065           2 :                     case BI_CREATE_TABLE:
    1066           2 :                         cmdtag = "CREATE TABLE";
    1067           2 :                         recv_step++;
    1068           2 :                         break;
    1069           2 :                     case BI_PREPARE:
    1070           2 :                         cmdtag = "";
    1071           2 :                         description = "PREPARE";
    1072           2 :                         recv_step++;
    1073           2 :                         break;
    1074        1400 :                     case BI_INSERT_ROWS:
    1075        1400 :                         cmdtag = "INSERT";
    1076        1400 :                         rows_to_receive--;
    1077        1400 :                         if (rows_to_receive == 0)
    1078           2 :                             recv_step++;
    1079        1400 :                         break;
    1080           2 :                     case BI_COMMIT_TX:
    1081           2 :                         cmdtag = "COMMIT";
    1082           2 :                         recv_step++;
    1083           2 :                         break;
    1084           2 :                     case BI_SYNC:
    1085           2 :                         cmdtag = "";
    1086           2 :                         description = "SYNC";
    1087           2 :                         status = PGRES_PIPELINE_SYNC;
    1088           2 :                         recv_step++;
    1089           2 :                         break;
    1090           0 :                     case BI_DONE:
    1091             :                         /* unreachable */
    1092           0 :                         pg_fatal("unreachable state");
    1093             :                 }
    1094             : 
    1095        1412 :                 if (PQresultStatus(res) != status)
    1096           0 :                     pg_fatal("%s reported status %s, expected %s\n"
    1097             :                              "Error message: \"%s\"",
    1098             :                              description, PQresStatus(PQresultStatus(res)),
    1099             :                              PQresStatus(status), PQerrorMessage(conn));
    1100             : 
    1101        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
    1102           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
    1103             :                              description, cmdtag, PQcmdStatus(res));
    1104             : 
    1105             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
    1106             : 
    1107        1412 :                 PQclear(res);
    1108             :             }
    1109             :         }
    1110             : 
    1111             :         /* Write more rows and/or the end pipeline message, if needed */
    1112       39850 :         if (FD_ISSET(sock, &output_mask))
    1113             :         {
    1114       39848 :             PQflush(conn);
    1115             : 
    1116       39848 :             if (send_step == BI_INSERT_ROWS)
    1117             :             {
    1118        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
    1119             :                 /* use up some buffer space with a wide value */
    1120        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
    1121             : 
    1122        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
    1123             :                                         2, insert_params, NULL, NULL, 0) == 1)
    1124             :                 {
    1125             :                     pg_debug("sent row %d\n", rows_to_send);
    1126             : 
    1127        1400 :                     rows_to_send--;
    1128        1400 :                     if (rows_to_send == 0)
    1129           2 :                         send_step++;
    1130             :                 }
    1131             :                 else
    1132             :                 {
    1133             :                     /*
    1134             :                      * in nonblocking mode, so it's OK for an insert to fail
    1135             :                      * to send
    1136             :                      */
    1137           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
    1138             :                             rows_to_send, PQerrorMessage(conn));
    1139             :                 }
    1140             :             }
    1141       38448 :             else if (send_step == BI_COMMIT_TX)
    1142             :             {
    1143           2 :                 if (PQsendQueryParams(conn, "COMMIT",
    1144             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
    1145             :                 {
    1146             :                     pg_debug("sent COMMIT\n");
    1147           2 :                     send_step++;
    1148             :                 }
    1149             :                 else
    1150             :                 {
    1151           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
    1152             :                             PQerrorMessage(conn));
    1153             :                 }
    1154             :             }
    1155       38446 :             else if (send_step == BI_SYNC)
    1156             :             {
    1157           2 :                 if (PQpipelineSync(conn) == 1)
    1158             :                 {
    1159           2 :                     fprintf(stdout, "pipeline sync sent\n");
    1160           2 :                     send_step++;
    1161             :                 }
    1162             :                 else
    1163             :                 {
    1164           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
    1165             :                             PQerrorMessage(conn));
    1166             :                 }
    1167             :             }
    1168             :         }
    1169             :     }
    1170             : 
    1171             :     /* We've got the sync message and the pipeline should be done */
    1172           2 :     if (PQexitPipelineMode(conn) != 1)
    1173           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1174             :                  PQerrorMessage(conn));
    1175             : 
    1176           2 :     if (PQsetnonblocking(conn, 0) != 0)
    1177           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
    1178             : 
    1179           2 :     fprintf(stderr, "ok\n");
    1180           2 : }
    1181             : 
    1182             : static void
    1183           2 : test_prepared(PGconn *conn)
    1184             : {
    1185           2 :     PGresult   *res = NULL;
    1186           2 :     Oid         param_oids[1] = {INT4OID};
    1187             :     Oid         expected_oids[4];
    1188             :     Oid         typ;
    1189             : 
    1190           2 :     fprintf(stderr, "prepared... ");
    1191             : 
    1192           2 :     if (PQenterPipelineMode(conn) != 1)
    1193           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1194           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
    1195             :                       "interval '1 sec'",
    1196             :                       1, param_oids) != 1)
    1197           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
    1198           2 :     expected_oids[0] = INT4OID;
    1199           2 :     expected_oids[1] = TEXTOID;
    1200           2 :     expected_oids[2] = NUMERICOID;
    1201           2 :     expected_oids[3] = INTERVALOID;
    1202           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
    1203           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
    1204           2 :     if (PQpipelineSync(conn) != 1)
    1205           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1206             : 
    1207           2 :     consume_result_status(conn, PGRES_COMMAND_OK);
    1208             : 
    1209           2 :     consume_null_result(conn);
    1210             : 
    1211           2 :     res = confirm_result_status(conn, PGRES_COMMAND_OK);
    1212           2 :     if (PQnfields(res) != lengthof(expected_oids))
    1213           0 :         pg_fatal("expected %zu columns, got %d",
    1214             :                  lengthof(expected_oids), PQnfields(res));
    1215          10 :     for (int i = 0; i < PQnfields(res); i++)
    1216             :     {
    1217           8 :         typ = PQftype(res, i);
    1218           8 :         if (typ != expected_oids[i])
    1219           0 :             pg_fatal("field %d: expected type %u, got %u",
    1220             :                      i, expected_oids[i], typ);
    1221             :     }
    1222           2 :     PQclear(res);
    1223             : 
    1224           2 :     consume_null_result(conn);
    1225             : 
    1226           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1227             : 
    1228           2 :     fprintf(stderr, "closing statement..");
    1229           2 :     if (PQsendClosePrepared(conn, "select_one") != 1)
    1230           0 :         pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
    1231           2 :     if (PQpipelineSync(conn) != 1)
    1232           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1233             : 
    1234           2 :     consume_result_status(conn, PGRES_COMMAND_OK);
    1235             : 
    1236           2 :     consume_null_result(conn);
    1237             : 
    1238           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1239             : 
    1240           2 :     if (PQexitPipelineMode(conn) != 1)
    1241           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1242             : 
    1243             :     /* Now that it's closed we should get an error when describing */
    1244           2 :     res = PQdescribePrepared(conn, "select_one");
    1245           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1246           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1247           2 :     PQclear(res);
    1248             : 
    1249             :     /*
    1250             :      * Also test the blocking close, this should not fail since closing a
    1251             :      * non-existent prepared statement is a no-op
    1252             :      */
    1253           2 :     res = PQclosePrepared(conn, "select_one");
    1254           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1255           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1256           2 :     PQclear(res);
    1257             : 
    1258           2 :     fprintf(stderr, "creating portal... ");
    1259             : 
    1260           2 :     res = PQexec(conn, "BEGIN");
    1261           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1262           0 :         pg_fatal("BEGIN failed: %s", PQerrorMessage(conn));
    1263           2 :     PQclear(res);
    1264             : 
    1265           2 :     res = PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1266           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1267           0 :         pg_fatal("DECLARE CURSOR failed: %s", PQerrorMessage(conn));
    1268           2 :     PQclear(res);
    1269             : 
    1270           2 :     PQenterPipelineMode(conn);
    1271           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1272           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1273           2 :     if (PQpipelineSync(conn) != 1)
    1274           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1275             : 
    1276           2 :     res = confirm_result_status(conn, PGRES_COMMAND_OK);
    1277           2 :     typ = PQftype(res, 0);
    1278           2 :     if (typ != INT4OID)
    1279           0 :         pg_fatal("portal: expected type %u, got %u",
    1280             :                  INT4OID, typ);
    1281           2 :     PQclear(res);
    1282             : 
    1283           2 :     consume_null_result(conn);
    1284             : 
    1285           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1286             : 
    1287           2 :     fprintf(stderr, "closing portal... ");
    1288           2 :     if (PQsendClosePortal(conn, "cursor_one") != 1)
    1289           0 :         pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1290           2 :     if (PQpipelineSync(conn) != 1)
    1291           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1292             : 
    1293           2 :     consume_result_status(conn, PGRES_COMMAND_OK);
    1294             : 
    1295           2 :     consume_null_result(conn);
    1296             : 
    1297           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1298             : 
    1299           2 :     if (PQexitPipelineMode(conn) != 1)
    1300           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1301             : 
    1302             :     /* Now that it's closed we should get an error when describing */
    1303           2 :     res = PQdescribePortal(conn, "cursor_one");
    1304           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1305           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1306           2 :     PQclear(res);
    1307             : 
    1308             :     /*
    1309             :      * Also test the blocking close, this should not fail since closing a
    1310             :      * non-existent portal is a no-op
    1311             :      */
    1312           2 :     res = PQclosePortal(conn, "cursor_one");
    1313           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1314           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1315           2 :     PQclear(res);
    1316             : 
    1317           2 :     fprintf(stderr, "ok\n");
    1318           2 : }
    1319             : 
    1320             : /*
    1321             :  * Test max_protocol_version options.
    1322             :  */
    1323             : static void
    1324           2 : test_protocol_version(PGconn *conn)
    1325             : {
    1326             :     const char **keywords;
    1327             :     const char **vals;
    1328             :     int         nopts;
    1329           2 :     PQconninfoOption *opts = PQconninfo(conn);
    1330             :     int         protocol_version;
    1331           2 :     int         max_protocol_version_index = -1;
    1332             :     int         i;
    1333             : 
    1334             :     /* Prepare keywords/vals arrays, copied from the existing connection. */
    1335           2 :     nopts = 0;
    1336         104 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1337         102 :         nopts++;
    1338           2 :     nopts++;                    /* NULL terminator */
    1339             : 
    1340           2 :     keywords = pg_malloc0(sizeof(char *) * nopts);
    1341           2 :     vals = pg_malloc0(sizeof(char *) * nopts);
    1342             : 
    1343           2 :     i = 0;
    1344         104 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1345             :     {
    1346             :         /*
    1347             :          * If the test already specified max_protocol_version, we want to
    1348             :          * replace it rather than attempting to override it. This matters when
    1349             :          * testing defaults, because empty option values at the end of the
    1350             :          * connection string won't replace earlier settings.
    1351             :          */
    1352         102 :         if (strcmp(opt->keyword, "max_protocol_version") == 0)
    1353           2 :             max_protocol_version_index = i;
    1354         100 :         else if (!opt->val)
    1355          62 :             continue;
    1356             : 
    1357          40 :         keywords[i] = opt->keyword;
    1358          40 :         vals[i] = opt->val;
    1359             : 
    1360          40 :         i++;
    1361             :     }
    1362             : 
    1363             :     Assert(max_protocol_version_index >= 0);
    1364             : 
    1365             :     /*
    1366             :      * Test default protocol_version
    1367             :      */
    1368           2 :     vals[max_protocol_version_index] = "";
    1369           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1370             : 
    1371           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1372           0 :         pg_fatal("Connection to database failed: %s",
    1373             :                  PQerrorMessage(conn));
    1374             : 
    1375           2 :     protocol_version = PQfullProtocolVersion(conn);
    1376           2 :     if (protocol_version != 30000)
    1377           0 :         pg_fatal("expected 30000, got %d", protocol_version);
    1378             : 
    1379           2 :     PQfinish(conn);
    1380             : 
    1381             :     /*
    1382             :      * Test max_protocol_version=3.0
    1383             :      */
    1384           2 :     vals[max_protocol_version_index] = "3.0";
    1385           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1386             : 
    1387           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1388           0 :         pg_fatal("Connection to database failed: %s",
    1389             :                  PQerrorMessage(conn));
    1390             : 
    1391           2 :     protocol_version = PQfullProtocolVersion(conn);
    1392           2 :     if (protocol_version != 30000)
    1393           0 :         pg_fatal("expected 30000, got %d", protocol_version);
    1394             : 
    1395           2 :     PQfinish(conn);
    1396             : 
    1397             :     /*
    1398             :      * Test max_protocol_version=3.1. It's not valid, we went straight from
    1399             :      * 3.0 to 3.2.
    1400             :      */
    1401           2 :     vals[max_protocol_version_index] = "3.1";
    1402           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1403             : 
    1404           2 :     if (PQstatus(conn) != CONNECTION_BAD)
    1405           0 :         pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
    1406             : 
    1407           2 :     PQfinish(conn);
    1408             : 
    1409             :     /*
    1410             :      * Test max_protocol_version=3.2
    1411             :      */
    1412           2 :     vals[max_protocol_version_index] = "3.2";
    1413           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1414             : 
    1415           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1416           0 :         pg_fatal("Connection to database failed: %s",
    1417             :                  PQerrorMessage(conn));
    1418             : 
    1419           2 :     protocol_version = PQfullProtocolVersion(conn);
    1420           2 :     if (protocol_version != 30002)
    1421           0 :         pg_fatal("expected 30002, got %d", protocol_version);
    1422             : 
    1423           2 :     PQfinish(conn);
    1424             : 
    1425             :     /*
    1426             :      * Test max_protocol_version=latest. 'latest' currently means '3.2'.
    1427             :      */
    1428           2 :     vals[max_protocol_version_index] = "latest";
    1429           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1430             : 
    1431           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1432           0 :         pg_fatal("Connection to database failed: %s",
    1433             :                  PQerrorMessage(conn));
    1434             : 
    1435           2 :     protocol_version = PQfullProtocolVersion(conn);
    1436           2 :     if (protocol_version != 30002)
    1437           0 :         pg_fatal("expected 30002, got %d", protocol_version);
    1438             : 
    1439           2 :     PQfinish(conn);
    1440             : 
    1441           2 :     pfree(keywords);
    1442           2 :     pfree(vals);
    1443           2 :     PQconninfoFree(opts);
    1444           2 : }
    1445             : 
    1446             : /* Notice processor: print notices, and count how many we got */
    1447             : static void
    1448           2 : notice_processor(void *arg, const char *message)
    1449             : {
    1450           2 :     int        *n_notices = (int *) arg;
    1451             : 
    1452           2 :     (*n_notices)++;
    1453           2 :     fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1454           2 : }
    1455             : 
    1456             : /* Verify behavior in "idle" state */
    1457             : static void
    1458           2 : test_pipeline_idle(PGconn *conn)
    1459             : {
    1460           2 :     int         n_notices = 0;
    1461             : 
    1462           2 :     fprintf(stderr, "\npipeline idle...\n");
    1463             : 
    1464           2 :     PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1465             : 
    1466             :     /* Try to exit pipeline mode in pipeline-idle state */
    1467           2 :     if (PQenterPipelineMode(conn) != 1)
    1468           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1469           2 :     if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1470           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1471           2 :     PQsendFlushRequest(conn);
    1472             : 
    1473           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1474             : 
    1475           2 :     consume_null_result(conn);
    1476             : 
    1477           2 :     if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1478           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1479           2 :     if (PQexitPipelineMode(conn) == 1)
    1480           0 :         pg_fatal("exiting pipeline succeeded when it shouldn't");
    1481           2 :     if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1482             :                 strlen("cannot exit pipeline mode")) != 0)
    1483           0 :         pg_fatal("did not get expected error; got: %s",
    1484             :                  PQerrorMessage(conn));
    1485           2 :     PQsendFlushRequest(conn);
    1486             : 
    1487           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1488             : 
    1489           2 :     consume_null_result(conn);
    1490             : 
    1491           2 :     if (PQexitPipelineMode(conn) != 1)
    1492           0 :         pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1493             : 
    1494           2 :     if (n_notices > 0)
    1495           0 :         pg_fatal("got %d notice(s)", n_notices);
    1496           2 :     fprintf(stderr, "ok - 1\n");
    1497             : 
    1498             :     /* Have a WARNING in the middle of a resultset */
    1499           2 :     if (PQenterPipelineMode(conn) != 1)
    1500           0 :         pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1501           2 :     if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1502           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1503           2 :     PQsendFlushRequest(conn);
    1504             : 
    1505           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1506             : 
    1507           2 :     if (PQexitPipelineMode(conn) != 1)
    1508           0 :         pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1509           2 :     fprintf(stderr, "ok - 2\n");
    1510           2 : }
    1511             : 
    1512             : static void
    1513           2 : test_simple_pipeline(PGconn *conn)
    1514             : {
    1515           2 :     const char *dummy_params[1] = {"1"};
    1516           2 :     Oid         dummy_param_oids[1] = {INT4OID};
    1517             : 
    1518           2 :     fprintf(stderr, "simple pipeline... ");
    1519             : 
    1520             :     /*
    1521             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
    1522             :      * process the results of as they come in.
    1523             :      *
    1524             :      * For a simple case we should be able to do this without interim
    1525             :      * processing of results since our output buffer will give us enough slush
    1526             :      * to work with and we won't block on sending. So blocking mode is fine.
    1527             :      */
    1528           2 :     if (PQisnonblocking(conn))
    1529           0 :         pg_fatal("Expected blocking connection mode");
    1530             : 
    1531           2 :     if (PQenterPipelineMode(conn) != 1)
    1532           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1533             : 
    1534           2 :     if (PQsendQueryParams(conn, "SELECT $1",
    1535             :                           1, dummy_param_oids, dummy_params,
    1536             :                           NULL, NULL, 0) != 1)
    1537           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1538             : 
    1539           2 :     if (PQexitPipelineMode(conn) != 0)
    1540           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1541             : 
    1542           2 :     if (PQpipelineSync(conn) != 1)
    1543           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1544             : 
    1545           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1546             : 
    1547           2 :     consume_null_result(conn);
    1548             : 
    1549             :     /*
    1550             :      * Even though we've processed the result there's still a sync to come and
    1551             :      * we can't exit pipeline mode yet
    1552             :      */
    1553           2 :     if (PQexitPipelineMode(conn) != 0)
    1554           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1555             : 
    1556           2 :     consume_result_status(conn, PGRES_PIPELINE_SYNC);
    1557             : 
    1558           2 :     consume_null_result(conn);
    1559             : 
    1560             :     /* We're still in pipeline mode... */
    1561           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1562           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1563             : 
    1564             :     /* ... until we end it, which we can safely do now */
    1565           2 :     if (PQexitPipelineMode(conn) != 1)
    1566           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1567             :                  PQerrorMessage(conn));
    1568             : 
    1569           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1570           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1571             : 
    1572           2 :     fprintf(stderr, "ok\n");
    1573           2 : }
    1574             : 
    1575             : static void
    1576           2 : test_singlerowmode(PGconn *conn)
    1577             : {
    1578             :     PGresult   *res;
    1579             :     int         i;
    1580           2 :     bool        pipeline_ended = false;
    1581             : 
    1582           2 :     if (PQenterPipelineMode(conn) != 1)
    1583           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1584             :                  PQerrorMessage(conn));
    1585             : 
    1586             :     /* One series of three commands, using single-row mode for the first two. */
    1587           8 :     for (i = 0; i < 3; i++)
    1588             :     {
    1589             :         char       *param[1];
    1590             : 
    1591           6 :         param[0] = psprintf("%d", 44 + i);
    1592             : 
    1593           6 :         if (PQsendQueryParams(conn,
    1594             :                               "SELECT generate_series(42, $1)",
    1595             :                               1,
    1596             :                               NULL,
    1597             :                               (const char *const *) param,
    1598             :                               NULL,
    1599             :                               NULL,
    1600             :                               0) != 1)
    1601           0 :             pg_fatal("failed to send query: %s",
    1602             :                      PQerrorMessage(conn));
    1603           6 :         pfree(param[0]);
    1604             :     }
    1605           2 :     if (PQpipelineSync(conn) != 1)
    1606           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1607             : 
    1608          10 :     for (i = 0; !pipeline_ended; i++)
    1609             :     {
    1610           8 :         bool        first = true;
    1611             :         bool        saw_ending_tuplesok;
    1612           8 :         bool        isSingleTuple = false;
    1613             : 
    1614             :         /* Set single row mode for only first 2 SELECT queries */
    1615           8 :         if (i < 2)
    1616             :         {
    1617           4 :             if (PQsetSingleRowMode(conn) != 1)
    1618           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1619             :         }
    1620             : 
    1621             :         /* Consume rows for this query */
    1622           8 :         saw_ending_tuplesok = false;
    1623          28 :         while ((res = PQgetResult(conn)) != NULL)
    1624             :         {
    1625          22 :             ExecStatusType est = PQresultStatus(res);
    1626             : 
    1627          22 :             if (est == PGRES_PIPELINE_SYNC)
    1628             :             {
    1629           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1630           2 :                 pipeline_ended = true;
    1631           2 :                 PQclear(res);
    1632           2 :                 if (i != 3)
    1633           0 :                     pg_fatal("Expected three results, got %d", i);
    1634           2 :                 break;
    1635             :             }
    1636             : 
    1637             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1638          20 :             if (first)
    1639             :             {
    1640           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1641           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1642             :                              i, PQresStatus(est));
    1643           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1644           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1645             :                              i, PQresStatus(est));
    1646           6 :                 first = false;
    1647             :             }
    1648             : 
    1649          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1650          20 :             switch (est)
    1651             :             {
    1652           6 :                 case PGRES_TUPLES_OK:
    1653           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1654           6 :                     saw_ending_tuplesok = true;
    1655           6 :                     if (isSingleTuple)
    1656             :                     {
    1657           4 :                         if (PQntuples(res) == 0)
    1658           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1659             :                         else
    1660           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1661             :                     }
    1662           6 :                     break;
    1663             : 
    1664          14 :                 case PGRES_SINGLE_TUPLE:
    1665          14 :                     isSingleTuple = true;
    1666          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1667          14 :                     break;
    1668             : 
    1669           0 :                 default:
    1670           0 :                     pg_fatal("unexpected");
    1671             :             }
    1672          20 :             PQclear(res);
    1673             :         }
    1674           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1675           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1676             :     }
    1677             : 
    1678             :     /*
    1679             :      * Now issue one command, get its results in with single-row mode, then
    1680             :      * issue another command, and get its results in normal mode; make sure
    1681             :      * the single-row mode flag is reset as expected.
    1682             :      */
    1683           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1684             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1685           0 :         pg_fatal("failed to send query: %s",
    1686             :                  PQerrorMessage(conn));
    1687           2 :     if (PQsendFlushRequest(conn) != 1)
    1688           0 :         pg_fatal("failed to send flush request");
    1689           2 :     if (PQsetSingleRowMode(conn) != 1)
    1690           0 :         pg_fatal("PQsetSingleRowMode() failed");
    1691             : 
    1692           2 :     consume_result_status(conn, PGRES_SINGLE_TUPLE);
    1693             : 
    1694           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1695             : 
    1696           2 :     consume_null_result(conn);
    1697             : 
    1698           2 :     if (PQsendQueryParams(conn, "SELECT 1",
    1699             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1700           0 :         pg_fatal("failed to send query: %s",
    1701             :                  PQerrorMessage(conn));
    1702           2 :     if (PQsendFlushRequest(conn) != 1)
    1703           0 :         pg_fatal("failed to send flush request");
    1704             : 
    1705           2 :     consume_result_status(conn, PGRES_TUPLES_OK);
    1706             : 
    1707           2 :     consume_null_result(conn);
    1708             : 
    1709             :     /*
    1710             :      * Try chunked mode as well; make sure that it correctly delivers a
    1711             :      * partial final chunk.
    1712             :      */
    1713           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
    1714             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1715           0 :         pg_fatal("failed to send query: %s",
    1716             :                  PQerrorMessage(conn));
    1717           2 :     if (PQsendFlushRequest(conn) != 1)
    1718           0 :         pg_fatal("failed to send flush request");
    1719           2 :     if (PQsetChunkedRowsMode(conn, 3) != 1)
    1720           0 :         pg_fatal("PQsetChunkedRowsMode() failed");
    1721             : 
    1722           2 :     res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
    1723           2 :     if (PQntuples(res) != 3)
    1724           0 :         pg_fatal("Expected 3 rows, got %d", PQntuples(res));
    1725           2 :     PQclear(res);
    1726             : 
    1727           2 :     res = confirm_result_status(conn, PGRES_TUPLES_CHUNK);
    1728           2 :     if (PQntuples(res) != 2)
    1729           0 :         pg_fatal("Expected 2 rows, got %d", PQntuples(res));
    1730           2 :     PQclear(res);
    1731             : 
    1732           2 :     res = confirm_result_status(conn, PGRES_TUPLES_OK);
    1733           2 :     if (PQntuples(res) != 0)
    1734           0 :         pg_fatal("Expected 0 rows, got %d", PQntuples(res));
    1735           2 :     PQclear(res);
    1736             : 
    1737           2 :     consume_null_result(conn);
    1738             : 
    1739           2 :     if (PQexitPipelineMode(conn) != 1)
    1740           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1741             : 
    1742           2 :     fprintf(stderr, "ok\n");
    1743           2 : }
    1744             : 
    1745             : /*
    1746             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1747             :  * an error, ignoring transaction commands.
    1748             :  */
    1749             : static void
    1750           2 : test_transaction(PGconn *conn)
    1751             : {
    1752             :     PGresult   *res;
    1753             :     bool        expect_null;
    1754           2 :     int         num_syncs = 0;
    1755             : 
    1756           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1757             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1758           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1759           0 :         pg_fatal("failed to create test table: %s",
    1760             :                  PQerrorMessage(conn));
    1761           2 :     PQclear(res);
    1762             : 
    1763           2 :     if (PQenterPipelineMode(conn) != 1)
    1764           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1765             :                  PQerrorMessage(conn));
    1766           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1767           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1768             :                  PQerrorMessage(conn));
    1769             : 
    1770           2 :     if (PQsendQueryParams(conn,
    1771             :                           "BEGIN",
    1772             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1773           0 :         pg_fatal("failed to send query: %s",
    1774             :                  PQerrorMessage(conn));
    1775           2 :     if (PQsendQueryParams(conn,
    1776             :                           "SELECT 0/0",
    1777             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1778           0 :         pg_fatal("failed to send query: %s",
    1779             :                  PQerrorMessage(conn));
    1780             : 
    1781             :     /*
    1782             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1783             :      * get out of the pipeline-aborted state first.
    1784             :      */
    1785           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1786           0 :         pg_fatal("failed to execute prepared: %s",
    1787             :                  PQerrorMessage(conn));
    1788             : 
    1789             :     /* This insert fails because we're in pipeline-aborted state */
    1790           2 :     if (PQsendQueryParams(conn,
    1791             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1792             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1793           0 :         pg_fatal("failed to send query: %s",
    1794             :                  PQerrorMessage(conn));
    1795           2 :     if (PQpipelineSync(conn) != 1)
    1796           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1797           2 :     num_syncs++;
    1798             : 
    1799             :     /*
    1800             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1801             :      * an aborted transaction
    1802             :      */
    1803           2 :     if (PQsendQueryParams(conn,
    1804             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1805             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1806           0 :         pg_fatal("failed to send query: %s",
    1807             :                  PQerrorMessage(conn));
    1808           2 :     if (PQpipelineSync(conn) != 1)
    1809           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1810           2 :     num_syncs++;
    1811             : 
    1812             :     /*
    1813             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1814             :      * PQpipelineSync above.
    1815             :      */
    1816           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1817           0 :         pg_fatal("failed to execute prepared: %s",
    1818             :                  PQerrorMessage(conn));
    1819             : 
    1820             :     /*
    1821             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1822             :      * insert works
    1823             :      */
    1824           2 :     if (PQsendQueryParams(conn,
    1825             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1826             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1827           0 :         pg_fatal("failed to send query: %s",
    1828             :                  PQerrorMessage(conn));
    1829             :     /* Send two syncs now -- match up to SYNC messages below */
    1830           2 :     if (PQpipelineSync(conn) != 1)
    1831           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1832           2 :     num_syncs++;
    1833           2 :     if (PQpipelineSync(conn) != 1)
    1834           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1835           2 :     num_syncs++;
    1836             : 
    1837           2 :     expect_null = false;
    1838           2 :     for (int i = 0;; i++)
    1839          38 :     {
    1840             :         ExecStatusType restype;
    1841             : 
    1842          40 :         res = PQgetResult(conn);
    1843          40 :         if (res == NULL)
    1844             :         {
    1845          16 :             printf("%d: got NULL result\n", i);
    1846          16 :             if (!expect_null)
    1847           0 :                 pg_fatal("did not expect NULL here");
    1848          16 :             expect_null = false;
    1849          16 :             continue;
    1850             :         }
    1851          24 :         restype = PQresultStatus(res);
    1852          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1853          24 :         if (expect_null)
    1854           0 :             pg_fatal("expected NULL");
    1855          24 :         if (restype == PGRES_FATAL_ERROR)
    1856           4 :             printf("; error: %s", PQerrorMessage(conn));
    1857          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1858             :         {
    1859           4 :             printf(": command didn't run because pipeline aborted\n");
    1860             :         }
    1861             :         else
    1862          16 :             printf("\n");
    1863          24 :         PQclear(res);
    1864             : 
    1865          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1866           8 :             num_syncs--;
    1867             :         else
    1868          16 :             expect_null = true;
    1869          24 :         if (num_syncs <= 0)
    1870           2 :             break;
    1871             :     }
    1872             : 
    1873           2 :     consume_null_result(conn);
    1874             : 
    1875           2 :     if (PQexitPipelineMode(conn) != 1)
    1876           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1877             : 
    1878             :     /* We expect to find one tuple containing the value "3" */
    1879           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1880           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1881           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1882           2 :     if (PQntuples(res) != 1)
    1883           0 :         pg_fatal("did not get 1 tuple");
    1884           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1885           0 :         pg_fatal("did not get expected tuple");
    1886           2 :     PQclear(res);
    1887             : 
    1888           2 :     fprintf(stderr, "ok\n");
    1889           2 : }
    1890             : 
    1891             : /*
    1892             :  * In this test mode we send a stream of queries, with one in the middle
    1893             :  * causing an error.  Verify that we can still send some more after the
    1894             :  * error and have libpq work properly.
    1895             :  */
    1896             : static void
    1897           2 : test_uniqviol(PGconn *conn)
    1898             : {
    1899           2 :     int         sock = PQsocket(conn);
    1900             :     PGresult   *res;
    1901           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    1902             :     const char *paramValues[2];
    1903             :     char        paramValue0[MAXINT8LEN];
    1904             :     char        paramValue1[MAXINT8LEN];
    1905           2 :     int         ctr = 0;
    1906           2 :     int         numsent = 0;
    1907           2 :     int         results = 0;
    1908           2 :     bool        read_done = false;
    1909           2 :     bool        write_done = false;
    1910           2 :     bool        error_sent = false;
    1911           2 :     bool        got_error = false;
    1912           2 :     int         switched = 0;
    1913           2 :     int         socketful = 0;
    1914             :     fd_set      in_fds;
    1915             :     fd_set      out_fds;
    1916             : 
    1917           2 :     fprintf(stderr, "uniqviol ...");
    1918             : 
    1919           2 :     PQsetnonblocking(conn, 1);
    1920             : 
    1921           2 :     paramValues[0] = paramValue0;
    1922           2 :     paramValues[1] = paramValue1;
    1923           2 :     sprintf(paramValue1, "42");
    1924             : 
    1925           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1926             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1927           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1928           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1929           2 :     PQclear(res);
    1930             : 
    1931           2 :     res = PQexec(conn, "begin");
    1932           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1933           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1934           2 :     PQclear(res);
    1935             : 
    1936           2 :     res = PQprepare(conn, "insertion",
    1937             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    1938             :                     2, paramTypes);
    1939           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1940           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1941           2 :     PQclear(res);
    1942             : 
    1943           2 :     if (PQenterPipelineMode(conn) != 1)
    1944           0 :         pg_fatal("failed to enter pipeline mode");
    1945             : 
    1946          16 :     while (!read_done)
    1947             :     {
    1948             :         /*
    1949             :          * Avoid deadlocks by reading everything the server has sent before
    1950             :          * sending anything.  (Special precaution is needed here to process
    1951             :          * PQisBusy before testing the socket for read-readiness, because the
    1952             :          * socket does not turn read-ready after "sending" queries in aborted
    1953             :          * pipeline mode.)
    1954             :          */
    1955        1212 :         while (PQisBusy(conn) == 0)
    1956             :         {
    1957             :             bool        new_error;
    1958             : 
    1959        1202 :             if (results >= numsent)
    1960             :             {
    1961           2 :                 if (write_done)
    1962           0 :                     read_done = true;
    1963           2 :                 break;
    1964             :             }
    1965             : 
    1966        1200 :             res = PQgetResult(conn);
    1967        1200 :             new_error = process_result(conn, res, results, numsent);
    1968        1200 :             if (new_error && got_error)
    1969           0 :                 pg_fatal("got two errors");
    1970        1200 :             got_error |= new_error;
    1971        1200 :             if (results++ >= numsent - 1)
    1972             :             {
    1973           4 :                 if (write_done)
    1974           2 :                     read_done = true;
    1975           4 :                 break;
    1976             :             }
    1977             :         }
    1978             : 
    1979          16 :         if (read_done)
    1980           2 :             break;
    1981             : 
    1982         238 :         FD_ZERO(&out_fds);
    1983          14 :         FD_SET(sock, &out_fds);
    1984             : 
    1985         238 :         FD_ZERO(&in_fds);
    1986          14 :         FD_SET(sock, &in_fds);
    1987             : 
    1988          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    1989             :         {
    1990           0 :             if (errno == EINTR)
    1991           0 :                 continue;
    1992           0 :             pg_fatal("select() failed: %m");
    1993             :         }
    1994             : 
    1995          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    1996           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    1997             : 
    1998             :         /*
    1999             :          * If the socket is writable and we haven't finished sending queries,
    2000             :          * send some.
    2001             :          */
    2002          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    2003             :         {
    2004             :             for (;;)
    2005        1194 :             {
    2006             :                 int         flush;
    2007             : 
    2008             :                 /*
    2009             :                  * provoke uniqueness violation exactly once after having
    2010             :                  * switched to read mode.
    2011             :                  */
    2012        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    2013             :                 {
    2014           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    2015           2 :                     fprintf(stderr, "E");
    2016           2 :                     error_sent = true;
    2017             :                 }
    2018             :                 else
    2019             :                 {
    2020        1198 :                     fprintf(stderr, ".");
    2021        1198 :                     sprintf(paramValue0, "%d", ctr++);
    2022             :                 }
    2023             : 
    2024        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    2025           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    2026        1200 :                 numsent++;
    2027             : 
    2028             :                 /* Are we done writing? */
    2029        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    2030             :                 {
    2031           2 :                     if (PQsendFlushRequest(conn) != 1)
    2032           0 :                         pg_fatal("failed to send flush request");
    2033           2 :                     write_done = true;
    2034           2 :                     fprintf(stderr, "\ndone writing\n");
    2035           2 :                     PQflush(conn);
    2036           2 :                     break;
    2037             :                 }
    2038             : 
    2039             :                 /* is the outgoing socket full? */
    2040        1198 :                 flush = PQflush(conn);
    2041        1198 :                 if (flush == -1)
    2042           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    2043        1198 :                 if (flush == 1)
    2044             :                 {
    2045           4 :                     if (socketful == 0)
    2046           2 :                         socketful = numsent;
    2047           4 :                     fprintf(stderr, "\nswitch to reading\n");
    2048           4 :                     switched++;
    2049           4 :                     break;
    2050             :                 }
    2051             :             }
    2052             :         }
    2053             :     }
    2054             : 
    2055           2 :     if (!got_error)
    2056           0 :         pg_fatal("did not get expected error");
    2057             : 
    2058           2 :     fprintf(stderr, "ok\n");
    2059           2 : }
    2060             : 
    2061             : /*
    2062             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    2063             :  * the expected NULL that should follow it.
    2064             :  *
    2065             :  * Returns true if we read a fatal error message, otherwise false.
    2066             :  */
    2067             : static bool
    2068        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    2069             : {
    2070        1200 :     bool        got_error = false;
    2071             : 
    2072        1200 :     if (res == NULL)
    2073           0 :         pg_fatal("got unexpected NULL");
    2074             : 
    2075        1200 :     switch (PQresultStatus(res))
    2076             :     {
    2077           2 :         case PGRES_FATAL_ERROR:
    2078           2 :             got_error = true;
    2079           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    2080           2 :             PQclear(res);
    2081           2 :             consume_null_result(conn);
    2082           2 :             break;
    2083             : 
    2084         836 :         case PGRES_TUPLES_OK:
    2085         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    2086         836 :             PQclear(res);
    2087         836 :             consume_null_result(conn);
    2088         836 :             break;
    2089             : 
    2090         362 :         case PGRES_PIPELINE_ABORTED:
    2091         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    2092         362 :             PQclear(res);
    2093         362 :             consume_null_result(conn);
    2094         362 :             break;
    2095             : 
    2096           0 :         default:
    2097           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    2098             :     }
    2099             : 
    2100        1200 :     return got_error;
    2101             : }
    2102             : 
    2103             : 
    2104             : static void
    2105           0 : usage(const char *progname)
    2106             : {
    2107           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    2108           0 :     fprintf(stderr, "Usage:\n");
    2109           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    2110           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    2111           0 :     fprintf(stderr, "\nOptions:\n");
    2112           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    2113           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    2114           0 : }
    2115             : 
    2116             : static void
    2117           2 : print_test_list(void)
    2118             : {
    2119           2 :     printf("cancel\n");
    2120           2 :     printf("disallowed_in_pipeline\n");
    2121           2 :     printf("multi_pipelines\n");
    2122           2 :     printf("nosync\n");
    2123           2 :     printf("pipeline_abort\n");
    2124           2 :     printf("pipeline_idle\n");
    2125           2 :     printf("pipelined_insert\n");
    2126           2 :     printf("prepared\n");
    2127           2 :     printf("protocol_version\n");
    2128           2 :     printf("simple_pipeline\n");
    2129           2 :     printf("singlerow\n");
    2130           2 :     printf("transaction\n");
    2131           2 :     printf("uniqviol\n");
    2132           2 : }
    2133             : 
    2134             : int
    2135          30 : main(int argc, char **argv)
    2136             : {
    2137          30 :     const char *conninfo = "";
    2138             :     PGconn     *conn;
    2139          30 :     FILE       *trace = NULL;
    2140             :     char       *testname;
    2141          30 :     int         numrows = 10000;
    2142             :     PGresult   *res;
    2143             :     int         c;
    2144             : 
    2145         104 :     while ((c = getopt(argc, argv, "r:t:")) != -1)
    2146             :     {
    2147          44 :         switch (c)
    2148             :         {
    2149          26 :             case 'r':           /* numrows */
    2150          26 :                 errno = 0;
    2151          26 :                 numrows = strtol(optarg, NULL, 10);
    2152          26 :                 if (errno != 0 || numrows <= 0)
    2153             :                 {
    2154           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    2155             :                             optarg);
    2156           0 :                     exit(1);
    2157             :                 }
    2158          26 :                 break;
    2159          18 :             case 't':           /* trace file */
    2160          18 :                 tracefile = pg_strdup(optarg);
    2161          18 :                 break;
    2162             :         }
    2163             :     }
    2164             : 
    2165          30 :     if (optind < argc)
    2166             :     {
    2167          30 :         testname = pg_strdup(argv[optind]);
    2168          30 :         optind++;
    2169             :     }
    2170             :     else
    2171             :     {
    2172           0 :         usage(argv[0]);
    2173           0 :         exit(1);
    2174             :     }
    2175             : 
    2176          30 :     if (strcmp(testname, "tests") == 0)
    2177             :     {
    2178           2 :         print_test_list();
    2179           2 :         exit(0);
    2180             :     }
    2181             : 
    2182          28 :     if (optind < argc)
    2183             :     {
    2184          28 :         conninfo = pg_strdup(argv[optind]);
    2185          28 :         optind++;
    2186             :     }
    2187             : 
    2188             :     /* Make a connection to the database */
    2189          28 :     conn = PQconnectdb(conninfo);
    2190          28 :     if (PQstatus(conn) != CONNECTION_OK)
    2191             :     {
    2192           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    2193             :                 PQerrorMessage(conn));
    2194           0 :         exit_nicely(conn);
    2195             :     }
    2196             : 
    2197          28 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    2198          28 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2199           0 :         pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
    2200          28 :     PQclear(res);
    2201          28 :     res = PQexec(conn, "SET debug_parallel_query = off");
    2202          28 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2203           0 :         pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
    2204          28 :     PQclear(res);
    2205             : 
    2206             :     /* Set the trace file, if requested */
    2207          28 :     if (tracefile != NULL)
    2208             :     {
    2209          18 :         if (strcmp(tracefile, "-") == 0)
    2210           0 :             trace = stdout;
    2211             :         else
    2212          18 :             trace = fopen(tracefile, "w");
    2213          18 :         if (trace == NULL)
    2214           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    2215             : 
    2216             :         /* Make it line-buffered */
    2217          18 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    2218             : 
    2219          18 :         PQtrace(conn, trace);
    2220          18 :         PQsetTraceFlags(conn,
    2221             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    2222             :     }
    2223             : 
    2224          28 :     if (strcmp(testname, "cancel") == 0)
    2225           4 :         test_cancel(conn);
    2226          24 :     else if (strcmp(testname, "disallowed_in_pipeline") == 0)
    2227           2 :         test_disallowed_in_pipeline(conn);
    2228          22 :     else if (strcmp(testname, "multi_pipelines") == 0)
    2229           2 :         test_multi_pipelines(conn);
    2230          20 :     else if (strcmp(testname, "nosync") == 0)
    2231           2 :         test_nosync(conn);
    2232          18 :     else if (strcmp(testname, "pipeline_abort") == 0)
    2233           2 :         test_pipeline_abort(conn);
    2234          16 :     else if (strcmp(testname, "pipeline_idle") == 0)
    2235           2 :         test_pipeline_idle(conn);
    2236          14 :     else if (strcmp(testname, "pipelined_insert") == 0)
    2237           2 :         test_pipelined_insert(conn, numrows);
    2238          12 :     else if (strcmp(testname, "prepared") == 0)
    2239           2 :         test_prepared(conn);
    2240          10 :     else if (strcmp(testname, "protocol_version") == 0)
    2241           2 :         test_protocol_version(conn);
    2242           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    2243           2 :         test_simple_pipeline(conn);
    2244           6 :     else if (strcmp(testname, "singlerow") == 0)
    2245           2 :         test_singlerowmode(conn);
    2246           4 :     else if (strcmp(testname, "transaction") == 0)
    2247           2 :         test_transaction(conn);
    2248           2 :     else if (strcmp(testname, "uniqviol") == 0)
    2249           2 :         test_uniqviol(conn);
    2250             :     else
    2251             :     {
    2252           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    2253           0 :         exit(1);
    2254             :     }
    2255             : 
    2256             :     /* close the connection to the database and cleanup */
    2257          28 :     PQfinish(conn);
    2258             : 
    2259          28 :     if (trace && trace != stdout)
    2260          18 :         fclose(trace);
    2261             : 
    2262          28 :     return 0;
    2263             : }

Generated by: LCOV version 1.16