LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 910 1259 72.3 %
Date: 2025-05-09 02:15:25 Functions: 21 24 87.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/select.h>
      19             : #include <sys/time.h>
      20             : 
      21             : #include "catalog/pg_type_d.h"
      22             : #include "libpq-fe.h"
      23             : #include "pg_getopt.h"
      24             : 
      25             : 
      26             : static void exit_nicely(PGconn *conn);
      27             : pg_noreturn static void pg_fatal_impl(int line, const char *fmt,...)
      28             :             pg_attribute_printf(2, 3);
      29             : static bool process_result(PGconn *conn, PGresult *res, int results,
      30             :                            int numsent);
      31             : 
      32             : static const char *const progname = "libpq_pipeline";
      33             : 
      34             : /* Options and defaults */
      35             : static char *tracefile = NULL;  /* path to PQtrace() file */
      36             : 
      37             : 
      38             : #ifdef DEBUG_OUTPUT
      39             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      40             : #else
      41             : #define pg_debug(...)
      42             : #endif
      43             : 
      44             : static const char *const drop_table_sql =
      45             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      46             : static const char *const create_table_sql =
      47             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      48             : "int8filler int8);";
      49             : static const char *const insert_sql =
      50             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      51             : static const char *const insert_sql2 =
      52             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      53             : 
      54             : /* max char length of an int32/64, plus sign and null terminator */
      55             : #define MAXINTLEN 12
      56             : #define MAXINT8LEN 20
      57             : 
      58             : static void
      59           0 : exit_nicely(PGconn *conn)
      60             : {
      61           0 :     PQfinish(conn);
      62           0 :     exit(1);
      63             : }
      64             : 
      65             : /*
      66             :  * The following few functions are wrapped in macros to make the reported line
      67             :  * number in an error match the line number of the invocation.
      68             :  */
      69             : 
      70             : /*
      71             :  * Print an error to stderr and terminate the program.
      72             :  */
      73             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      74             : pg_noreturn static void
      75           0 : pg_fatal_impl(int line, const char *fmt,...)
      76             : {
      77             :     va_list     args;
      78             : 
      79           0 :     fflush(stdout);
      80             : 
      81           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      82           0 :     va_start(args, fmt);
      83           0 :     vfprintf(stderr, fmt, args);
      84           0 :     va_end(args);
      85             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      86           0 :     fprintf(stderr, "\n");
      87           0 :     exit(1);
      88             : }
      89             : 
      90             : /*
      91             :  * Check that the query on the given connection got canceled.
      92             :  */
      93             : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
      94             : static void
      95          24 : confirm_query_canceled_impl(int line, PGconn *conn)
      96             : {
      97          24 :     PGresult   *res = NULL;
      98             : 
      99          24 :     res = PQgetResult(conn);
     100          24 :     if (res == NULL)
     101           0 :         pg_fatal_impl(line, "PQgetResult returned null: %s",
     102             :                       PQerrorMessage(conn));
     103          24 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     104           0 :         pg_fatal_impl(line, "query did not fail when it was expected");
     105          24 :     if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
     106           0 :         pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
     107             :                       PQerrorMessage(conn));
     108          24 :     PQclear(res);
     109             : 
     110          24 :     while (PQisBusy(conn))
     111           0 :         PQconsumeInput(conn);
     112          24 : }
     113             : 
     114             : /*
     115             :  * Using monitorConn, query pg_stat_activity to see that the connection with
     116             :  * the given PID is either in the given state, or waiting on the given event
     117             :  * (only one of them can be given).
     118             :  */
     119             : static void
     120          48 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
     121             :                           char *state, char *event)
     122             : {
     123          48 :     const Oid   paramTypes[] = {INT4OID, TEXTOID};
     124             :     const char *paramValues[2];
     125          48 :     char       *pidstr = psprintf("%d", procpid);
     126             : 
     127             :     Assert((state == NULL) ^ (event == NULL));
     128             : 
     129          48 :     paramValues[0] = pidstr;
     130          48 :     paramValues[1] = state ? state : event;
     131             : 
     132             :     while (true)
     133           0 :     {
     134             :         PGresult   *res;
     135             :         char       *value;
     136             : 
     137          48 :         if (state != NULL)
     138          24 :             res = PQexecParams(monitorConn,
     139             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     140             :                                "pid = $1 AND state = $2",
     141             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     142             :         else
     143          24 :             res = PQexecParams(monitorConn,
     144             :                                "SELECT count(*) FROM pg_stat_activity WHERE "
     145             :                                "pid = $1 AND wait_event = $2",
     146             :                                2, paramTypes, paramValues, NULL, NULL, 0);
     147             : 
     148          48 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     149           0 :             pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
     150          48 :         if (PQntuples(res) != 1)
     151           0 :             pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
     152          48 :         if (PQnfields(res) != 1)
     153           0 :             pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
     154          48 :         value = PQgetvalue(res, 0, 0);
     155          48 :         if (strcmp(value, "0") != 0)
     156             :         {
     157          48 :             PQclear(res);
     158          48 :             break;
     159             :         }
     160           0 :         PQclear(res);
     161             : 
     162             :         /* wait 10ms before polling again */
     163           0 :         pg_usleep(10000);
     164             :     }
     165             : 
     166          48 :     pfree(pidstr);
     167          48 : }
     168             : 
     169             : #define send_cancellable_query(conn, monitorConn) \
     170             :     send_cancellable_query_impl(__LINE__, conn, monitorConn)
     171             : static void
     172          24 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
     173             : {
     174             :     const char *env_wait;
     175          24 :     const Oid   paramTypes[1] = {INT4OID};
     176             : 
     177             :     /*
     178             :      * Wait for the connection to be idle, so that our check for an active
     179             :      * connection below is reliable, instead of possibly seeing an outdated
     180             :      * state.
     181             :      */
     182          24 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
     183             : 
     184          24 :     env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
     185          24 :     if (env_wait == NULL)
     186          24 :         env_wait = "180";
     187             : 
     188          24 :     if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
     189             :                           &env_wait, NULL, NULL, 0) != 1)
     190           0 :         pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
     191             : 
     192             :     /*
     193             :      * Wait for the sleep to be active, because if the query is not running
     194             :      * yet, the cancel request that we send won't have any effect.
     195             :      */
     196          24 :     wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
     197          24 : }
     198             : 
     199             : /*
     200             :  * Create a new connection with the same conninfo as the given one.
     201             :  */
     202             : static PGconn *
     203           4 : copy_connection(PGconn *conn)
     204             : {
     205             :     PGconn     *copyConn;
     206           4 :     PQconninfoOption *opts = PQconninfo(conn);
     207             :     const char **keywords;
     208             :     const char **vals;
     209           4 :     int         nopts = 0;
     210             :     int         i;
     211             : 
     212         204 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     213         200 :         nopts++;
     214           4 :     nopts++;                    /* for the NULL terminator */
     215             : 
     216           4 :     keywords = pg_malloc(sizeof(char *) * nopts);
     217           4 :     vals = pg_malloc(sizeof(char *) * nopts);
     218             : 
     219           4 :     i = 0;
     220         204 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     221             :     {
     222         200 :         if (opt->val)
     223             :         {
     224          80 :             keywords[i] = opt->keyword;
     225          80 :             vals[i] = opt->val;
     226          80 :             i++;
     227             :         }
     228             :     }
     229           4 :     keywords[i] = vals[i] = NULL;
     230             : 
     231           4 :     copyConn = PQconnectdbParams(keywords, vals, false);
     232             : 
     233           4 :     if (PQstatus(copyConn) != CONNECTION_OK)
     234           0 :         pg_fatal("Connection to database failed: %s",
     235             :                  PQerrorMessage(copyConn));
     236             : 
     237           4 :     return copyConn;
     238             : }
     239             : 
     240             : /*
     241             :  * Test query cancellation routines
     242             :  */
     243             : static void
     244           4 : test_cancel(PGconn *conn)
     245             : {
     246             :     PGcancel   *cancel;
     247             :     PGcancelConn *cancelConn;
     248             :     PGconn     *monitorConn;
     249             :     char        errorbuf[256];
     250             : 
     251           4 :     fprintf(stderr, "test cancellations... ");
     252             : 
     253           4 :     if (PQsetnonblocking(conn, 1) != 0)
     254           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     255             : 
     256             :     /*
     257             :      * Make a separate connection to the database to monitor the query on the
     258             :      * main connection.
     259             :      */
     260           4 :     monitorConn = copy_connection(conn);
     261             :     Assert(PQstatus(monitorConn) == CONNECTION_OK);
     262             : 
     263             :     /* test PQcancel */
     264           4 :     send_cancellable_query(conn, monitorConn);
     265           4 :     cancel = PQgetCancel(conn);
     266           4 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     267           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     268           4 :     confirm_query_canceled(conn);
     269             : 
     270             :     /* PGcancel object can be reused for the next query */
     271           4 :     send_cancellable_query(conn, monitorConn);
     272           4 :     if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
     273           0 :         pg_fatal("failed to run PQcancel: %s", errorbuf);
     274           4 :     confirm_query_canceled(conn);
     275             : 
     276           4 :     PQfreeCancel(cancel);
     277             : 
     278             :     /* test PQrequestCancel */
     279           4 :     send_cancellable_query(conn, monitorConn);
     280           4 :     if (!PQrequestCancel(conn))
     281           0 :         pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
     282           4 :     confirm_query_canceled(conn);
     283             : 
     284             :     /* test PQcancelBlocking */
     285           4 :     send_cancellable_query(conn, monitorConn);
     286           4 :     cancelConn = PQcancelCreate(conn);
     287           4 :     if (!PQcancelBlocking(cancelConn))
     288           0 :         pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
     289           4 :     confirm_query_canceled(conn);
     290           4 :     PQcancelFinish(cancelConn);
     291             : 
     292             :     /* test PQcancelCreate and then polling with PQcancelPoll */
     293           4 :     send_cancellable_query(conn, monitorConn);
     294           4 :     cancelConn = PQcancelCreate(conn);
     295           4 :     if (!PQcancelStart(cancelConn))
     296           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     297             :     while (true)
     298           4 :     {
     299             :         struct timeval tv;
     300             :         fd_set      input_mask;
     301             :         fd_set      output_mask;
     302           8 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     303           8 :         int         sock = PQcancelSocket(cancelConn);
     304             : 
     305           8 :         if (pollres == PGRES_POLLING_OK)
     306           4 :             break;
     307             : 
     308           4 :         FD_ZERO(&input_mask);
     309           4 :         FD_ZERO(&output_mask);
     310           4 :         switch (pollres)
     311             :         {
     312           4 :             case PGRES_POLLING_READING:
     313             :                 pg_debug("polling for reads\n");
     314           4 :                 FD_SET(sock, &input_mask);
     315           4 :                 break;
     316           0 :             case PGRES_POLLING_WRITING:
     317             :                 pg_debug("polling for writes\n");
     318           0 :                 FD_SET(sock, &output_mask);
     319           0 :                 break;
     320           0 :             default:
     321           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     322             :         }
     323             : 
     324           4 :         if (sock < 0)
     325           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     326             : 
     327           4 :         tv.tv_sec = 3;
     328           4 :         tv.tv_usec = 0;
     329             : 
     330             :         while (true)
     331             :         {
     332           4 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     333             :             {
     334           0 :                 if (errno == EINTR)
     335           0 :                     continue;
     336           0 :                 pg_fatal("select() failed: %m");
     337             :             }
     338           4 :             break;
     339             :         }
     340             :     }
     341           4 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     342           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     343           4 :     confirm_query_canceled(conn);
     344             : 
     345             :     /*
     346             :      * test PQcancelReset works on the cancel connection and it can be reused
     347             :      * afterwards
     348             :      */
     349           4 :     PQcancelReset(cancelConn);
     350             : 
     351           4 :     send_cancellable_query(conn, monitorConn);
     352           4 :     if (!PQcancelStart(cancelConn))
     353           0 :         pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     354             :     while (true)
     355           4 :     {
     356             :         struct timeval tv;
     357             :         fd_set      input_mask;
     358             :         fd_set      output_mask;
     359           8 :         PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
     360           8 :         int         sock = PQcancelSocket(cancelConn);
     361             : 
     362           8 :         if (pollres == PGRES_POLLING_OK)
     363           4 :             break;
     364             : 
     365           4 :         FD_ZERO(&input_mask);
     366           4 :         FD_ZERO(&output_mask);
     367           4 :         switch (pollres)
     368             :         {
     369           4 :             case PGRES_POLLING_READING:
     370             :                 pg_debug("polling for reads\n");
     371           4 :                 FD_SET(sock, &input_mask);
     372           4 :                 break;
     373           0 :             case PGRES_POLLING_WRITING:
     374             :                 pg_debug("polling for writes\n");
     375           0 :                 FD_SET(sock, &output_mask);
     376           0 :                 break;
     377           0 :             default:
     378           0 :                 pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
     379             :         }
     380             : 
     381           4 :         if (sock < 0)
     382           0 :             pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
     383             : 
     384           4 :         tv.tv_sec = 3;
     385           4 :         tv.tv_usec = 0;
     386             : 
     387             :         while (true)
     388             :         {
     389           4 :             if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
     390             :             {
     391           0 :                 if (errno == EINTR)
     392           0 :                     continue;
     393           0 :                 pg_fatal("select() failed: %m");
     394             :             }
     395           4 :             break;
     396             :         }
     397             :     }
     398           4 :     if (PQcancelStatus(cancelConn) != CONNECTION_OK)
     399           0 :         pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
     400           4 :     confirm_query_canceled(conn);
     401             : 
     402           4 :     PQcancelFinish(cancelConn);
     403             : 
     404           4 :     fprintf(stderr, "ok\n");
     405           4 : }
     406             : 
     407             : static void
     408           2 : test_disallowed_in_pipeline(PGconn *conn)
     409             : {
     410           2 :     PGresult   *res = NULL;
     411             : 
     412           2 :     fprintf(stderr, "test error cases... ");
     413             : 
     414           2 :     if (PQisnonblocking(conn))
     415           0 :         pg_fatal("Expected blocking connection mode");
     416             : 
     417           2 :     if (PQenterPipelineMode(conn) != 1)
     418           0 :         pg_fatal("Unable to enter pipeline mode");
     419             : 
     420           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     421           0 :         pg_fatal("Pipeline mode not activated properly");
     422             : 
     423             :     /* PQexec should fail in pipeline mode */
     424           2 :     res = PQexec(conn, "SELECT 1");
     425           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     426           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     427           2 :     if (strcmp(PQerrorMessage(conn),
     428             :                "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     429           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     430             :                  PQerrorMessage(conn));
     431             : 
     432             :     /* PQsendQuery should fail in pipeline mode */
     433           2 :     if (PQsendQuery(conn, "SELECT 1") != 0)
     434           0 :         pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     435           2 :     if (strcmp(PQerrorMessage(conn),
     436             :                "PQsendQuery not allowed in pipeline mode\n") != 0)
     437           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     438             :                  PQerrorMessage(conn));
     439             : 
     440             :     /* Entering pipeline mode when already in pipeline mode is OK */
     441           2 :     if (PQenterPipelineMode(conn) != 1)
     442           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     443             : 
     444           2 :     if (PQisBusy(conn) != 0)
     445           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     446             : 
     447             :     /* ok, back to normal command mode */
     448           2 :     if (PQexitPipelineMode(conn) != 1)
     449           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     450             : 
     451           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     452           0 :         pg_fatal("Pipeline mode not terminated properly");
     453             : 
     454             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     455           2 :     if (PQexitPipelineMode(conn) != 1)
     456           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     457             : 
     458             :     /* can now PQexec again */
     459           2 :     res = PQexec(conn, "SELECT 1");
     460           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     461           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     462             :                  PQerrorMessage(conn));
     463             : 
     464           2 :     fprintf(stderr, "ok\n");
     465           2 : }
     466             : 
     467             : static void
     468           2 : test_multi_pipelines(PGconn *conn)
     469             : {
     470           2 :     PGresult   *res = NULL;
     471           2 :     const char *dummy_params[1] = {"1"};
     472           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     473             : 
     474           2 :     fprintf(stderr, "multi pipeline... ");
     475             : 
     476             :     /*
     477             :      * Queue up a couple of small pipelines and process each without returning
     478             :      * to command mode first.
     479             :      */
     480           2 :     if (PQenterPipelineMode(conn) != 1)
     481           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     482             : 
     483             :     /* first pipeline */
     484           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     485             :                           dummy_params, NULL, NULL, 0) != 1)
     486           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     487             : 
     488           2 :     if (PQpipelineSync(conn) != 1)
     489           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     490             : 
     491             :     /* second pipeline */
     492           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     493             :                           dummy_params, NULL, NULL, 0) != 1)
     494           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     495             : 
     496             :     /* Skip flushing once. */
     497           2 :     if (PQsendPipelineSync(conn) != 1)
     498           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     499             : 
     500             :     /* third pipeline */
     501           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     502             :                           dummy_params, NULL, NULL, 0) != 1)
     503           0 :         pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     504             : 
     505           2 :     if (PQpipelineSync(conn) != 1)
     506           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     507             : 
     508             :     /* OK, start processing the results */
     509             : 
     510             :     /* first pipeline */
     511             : 
     512           2 :     res = PQgetResult(conn);
     513           2 :     if (res == NULL)
     514           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     515             :                  PQerrorMessage(conn));
     516             : 
     517           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     518           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
     519             :                  PQresStatus(PQresultStatus(res)));
     520           2 :     PQclear(res);
     521           2 :     res = NULL;
     522             : 
     523           2 :     if (PQgetResult(conn) != NULL)
     524           0 :         pg_fatal("PQgetResult returned something extra after first result");
     525             : 
     526           2 :     if (PQexitPipelineMode(conn) != 0)
     527           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     528             : 
     529           2 :     res = PQgetResult(conn);
     530           2 :     if (res == NULL)
     531           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     532             :                  PQerrorMessage(conn));
     533             : 
     534           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     535           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     536             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     537           2 :     PQclear(res);
     538             : 
     539             :     /* second pipeline */
     540             : 
     541           2 :     res = PQgetResult(conn);
     542           2 :     if (res == NULL)
     543           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     544             :                  PQerrorMessage(conn));
     545             : 
     546           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     547           0 :         pg_fatal("Unexpected result code %s from second pipeline item",
     548             :                  PQresStatus(PQresultStatus(res)));
     549           2 :     PQclear(res);
     550           2 :     res = NULL;
     551             : 
     552           2 :     if (PQgetResult(conn) != NULL)
     553           0 :         pg_fatal("PQgetResult returned something extra after first result");
     554             : 
     555           2 :     if (PQexitPipelineMode(conn) != 0)
     556           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     557             : 
     558           2 :     res = PQgetResult(conn);
     559           2 :     if (res == NULL)
     560           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     561             :                  PQerrorMessage(conn));
     562             : 
     563           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     564           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     565             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     566           2 :     PQclear(res);
     567             : 
     568             :     /* third pipeline */
     569             : 
     570           2 :     res = PQgetResult(conn);
     571           2 :     if (res == NULL)
     572           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     573             :                  PQerrorMessage(conn));
     574             : 
     575           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     576           0 :         pg_fatal("Unexpected result code %s from third pipeline item",
     577             :                  PQresStatus(PQresultStatus(res)));
     578             : 
     579           2 :     res = PQgetResult(conn);
     580           2 :     if (res != NULL)
     581           0 :         pg_fatal("Expected null result, got %s",
     582             :                  PQresStatus(PQresultStatus(res)));
     583             : 
     584           2 :     res = PQgetResult(conn);
     585           2 :     if (res == NULL)
     586           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     587             :                  PQerrorMessage(conn));
     588             : 
     589           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     590           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     591             :                  PQresStatus(PQresultStatus(res)));
     592             : 
     593             :     /* We're still in pipeline mode ... */
     594           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     595           0 :         pg_fatal("Fell out of pipeline mode somehow");
     596             : 
     597             :     /* until we end it, which we can safely do now */
     598           2 :     if (PQexitPipelineMode(conn) != 1)
     599           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     600             :                  PQerrorMessage(conn));
     601             : 
     602           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     603           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     604             : 
     605           2 :     fprintf(stderr, "ok\n");
     606           2 : }
     607             : 
     608             : /*
     609             :  * Test behavior when a pipeline dispatches a number of commands that are
     610             :  * not flushed by a sync point.
     611             :  */
     612             : static void
     613           2 : test_nosync(PGconn *conn)
     614             : {
     615           2 :     int         numqueries = 10;
     616           2 :     int         results = 0;
     617           2 :     int         sock = PQsocket(conn);
     618             : 
     619           2 :     fprintf(stderr, "nosync... ");
     620             : 
     621           2 :     if (sock < 0)
     622           0 :         pg_fatal("invalid socket");
     623             : 
     624           2 :     if (PQenterPipelineMode(conn) != 1)
     625           0 :         pg_fatal("could not enter pipeline mode");
     626          22 :     for (int i = 0; i < numqueries; i++)
     627             :     {
     628             :         fd_set      input_mask;
     629             :         struct timeval tv;
     630             : 
     631          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     632             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     633           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     634          20 :         PQflush(conn);
     635             : 
     636             :         /*
     637             :          * If the server has written anything to us, read (some of) it now.
     638             :          */
     639          20 :         FD_ZERO(&input_mask);
     640          20 :         FD_SET(sock, &input_mask);
     641          20 :         tv.tv_sec = 0;
     642          20 :         tv.tv_usec = 0;
     643          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     644             :         {
     645           0 :             fprintf(stderr, "select() failed: %m\n");
     646           0 :             exit_nicely(conn);
     647             :         }
     648          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     649           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     650             :     }
     651             : 
     652             :     /* tell server to flush its output buffer */
     653           2 :     if (PQsendFlushRequest(conn) != 1)
     654           0 :         pg_fatal("failed to send flush request");
     655           2 :     PQflush(conn);
     656             : 
     657             :     /* Now read all results */
     658             :     for (;;)
     659          18 :     {
     660             :         PGresult   *res;
     661             : 
     662          20 :         res = PQgetResult(conn);
     663             : 
     664             :         /* NULL results are only expected after TUPLES_OK */
     665          20 :         if (res == NULL)
     666           0 :             pg_fatal("got unexpected NULL result after %d results", results);
     667             : 
     668             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     669          20 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     670             :         {
     671             :             PGresult   *res2;
     672             : 
     673             :             /* and one NULL result should follow each */
     674          20 :             res2 = PQgetResult(conn);
     675          20 :             if (res2 != NULL)
     676           0 :                 pg_fatal("expected NULL, got %s",
     677             :                          PQresStatus(PQresultStatus(res2)));
     678          20 :             PQclear(res);
     679          20 :             results++;
     680             : 
     681             :             /* if we're done, we're done */
     682          20 :             if (results == numqueries)
     683           2 :                 break;
     684             : 
     685          18 :             continue;
     686             :         }
     687             : 
     688             :         /* anything else is unexpected */
     689           0 :         pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
     690             :     }
     691             : 
     692           2 :     fprintf(stderr, "ok\n");
     693           2 : }
     694             : 
     695             : /*
     696             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     697             :  * still have to get results for each pipeline item, but the item will just be
     698             :  * a PGRES_PIPELINE_ABORTED code.
     699             :  *
     700             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     701             :  * usually use an xact, but in this case we want to observe the effects of each
     702             :  * statement.
     703             :  */
     704             : static void
     705           2 : test_pipeline_abort(PGconn *conn)
     706             : {
     707           2 :     PGresult   *res = NULL;
     708           2 :     const char *dummy_params[1] = {"1"};
     709           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     710             :     int         i;
     711             :     int         gotrows;
     712             :     bool        goterror;
     713             : 
     714           2 :     fprintf(stderr, "aborted pipeline... ");
     715             : 
     716           2 :     res = PQexec(conn, drop_table_sql);
     717           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     718           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     719             : 
     720           2 :     res = PQexec(conn, create_table_sql);
     721           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     722           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     723             : 
     724             :     /*
     725             :      * Queue up a couple of small pipelines and process each without returning
     726             :      * to command mode first. Make sure the second operation in the first
     727             :      * pipeline ERRORs.
     728             :      */
     729           2 :     if (PQenterPipelineMode(conn) != 1)
     730           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     731             : 
     732           2 :     dummy_params[0] = "1";
     733           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     734             :                           dummy_params, NULL, NULL, 0) != 1)
     735           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     736             : 
     737           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     738             :                           1, dummy_param_oids, dummy_params,
     739             :                           NULL, NULL, 0) != 1)
     740           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     741             : 
     742           2 :     dummy_params[0] = "2";
     743           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     744             :                           dummy_params, NULL, NULL, 0) != 1)
     745           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     746             : 
     747           2 :     if (PQpipelineSync(conn) != 1)
     748           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     749             : 
     750           2 :     dummy_params[0] = "3";
     751           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     752             :                           dummy_params, NULL, NULL, 0) != 1)
     753           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     754             :                  PQerrorMessage(conn));
     755             : 
     756           2 :     if (PQpipelineSync(conn) != 1)
     757           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     758             : 
     759             :     /*
     760             :      * OK, start processing the pipeline results.
     761             :      *
     762             :      * We should get a command-ok for the first query, then a fatal error and
     763             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     764             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     765             :      */
     766           2 :     res = PQgetResult(conn);
     767           2 :     if (res == NULL)
     768           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     769           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     770           0 :         pg_fatal("Unexpected result status %s: %s",
     771             :                  PQresStatus(PQresultStatus(res)),
     772             :                  PQresultErrorMessage(res));
     773           2 :     PQclear(res);
     774             : 
     775             :     /* NULL result to signal end-of-results for this command */
     776           2 :     if ((res = PQgetResult(conn)) != NULL)
     777           0 :         pg_fatal("Expected null result, got %s",
     778             :                  PQresStatus(PQresultStatus(res)));
     779             : 
     780             :     /* Second query caused error, so we expect an error next */
     781           2 :     res = PQgetResult(conn);
     782           2 :     if (res == NULL)
     783           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     784           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     785           0 :         pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
     786             :                  PQresStatus(PQresultStatus(res)));
     787           2 :     PQclear(res);
     788             : 
     789             :     /* NULL result to signal end-of-results for this command */
     790           2 :     if ((res = PQgetResult(conn)) != NULL)
     791           0 :         pg_fatal("Expected null result, got %s",
     792             :                  PQresStatus(PQresultStatus(res)));
     793             : 
     794             :     /*
     795             :      * pipeline should now be aborted.
     796             :      *
     797             :      * Note that we could still queue more queries at this point if we wanted;
     798             :      * they'd get added to a new third pipeline since we've already sent a
     799             :      * second. The aborted flag relates only to the pipeline being received.
     800             :      */
     801           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     802           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     803             : 
     804             :     /* third query in pipeline, the second insert */
     805           2 :     res = PQgetResult(conn);
     806           2 :     if (res == NULL)
     807           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     808           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
     809           0 :         pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
     810             :                  PQresStatus(PQresultStatus(res)));
     811           2 :     PQclear(res);
     812             : 
     813             :     /* NULL result to signal end-of-results for this command */
     814           2 :     if ((res = PQgetResult(conn)) != NULL)
     815           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     816             : 
     817           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     818           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     819             : 
     820             :     /* Ensure we're still in pipeline */
     821           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     822           0 :         pg_fatal("Fell out of pipeline mode somehow");
     823             : 
     824             :     /*
     825             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     826             :      *
     827             :      * (This is so clients know to start processing results normally again and
     828             :      * can tell the difference between skipped commands and the sync.)
     829             :      */
     830           2 :     res = PQgetResult(conn);
     831           2 :     if (res == NULL)
     832           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     833           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     834           0 :         pg_fatal("Unexpected result code from first pipeline sync\n"
     835             :                  "Expected PGRES_PIPELINE_SYNC, got %s",
     836             :                  PQresStatus(PQresultStatus(res)));
     837           2 :     PQclear(res);
     838             : 
     839           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     840           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     841             : 
     842             :     /* We're still in pipeline mode... */
     843           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     844           0 :         pg_fatal("Fell out of pipeline mode somehow");
     845             : 
     846             :     /* the insert from the second pipeline */
     847           2 :     res = PQgetResult(conn);
     848           2 :     if (res == NULL)
     849           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     850           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     851           0 :         pg_fatal("Unexpected result code %s from first item in second pipeline",
     852             :                  PQresStatus(PQresultStatus(res)));
     853           2 :     PQclear(res);
     854             : 
     855             :     /* Read the NULL result at the end of the command */
     856           2 :     if ((res = PQgetResult(conn)) != NULL)
     857           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     858             : 
     859             :     /* the second pipeline sync */
     860           2 :     if ((res = PQgetResult(conn)) == NULL)
     861           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     862           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     863           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     864             :                  PQresStatus(PQresultStatus(res)));
     865           2 :     PQclear(res);
     866             : 
     867           2 :     if ((res = PQgetResult(conn)) != NULL)
     868           0 :         pg_fatal("Expected null result, got %s: %s",
     869             :                  PQresStatus(PQresultStatus(res)),
     870             :                  PQerrorMessage(conn));
     871             : 
     872             :     /* Try to send two queries in one command */
     873           2 :     if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     874           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     875           2 :     if (PQpipelineSync(conn) != 1)
     876           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     877           2 :     goterror = false;
     878           4 :     while ((res = PQgetResult(conn)) != NULL)
     879             :     {
     880           2 :         switch (PQresultStatus(res))
     881             :         {
     882           2 :             case PGRES_FATAL_ERROR:
     883           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     884           0 :                     pg_fatal("expected error about multiple commands, got %s",
     885             :                              PQerrorMessage(conn));
     886           2 :                 printf("got expected %s", PQerrorMessage(conn));
     887           2 :                 goterror = true;
     888           2 :                 break;
     889           0 :             default:
     890           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     891             :                 break;
     892             :         }
     893             :     }
     894           2 :     if (!goterror)
     895           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     896           2 :     res = PQgetResult(conn);
     897           2 :     if (res == NULL)
     898           0 :         pg_fatal("got NULL result");
     899           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     900           0 :         pg_fatal("Unexpected result code %s from pipeline sync",
     901             :                  PQresStatus(PQresultStatus(res)));
     902           2 :     fprintf(stderr, "ok\n");
     903             : 
     904             :     /* Test single-row mode with an error partways */
     905           2 :     if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     906             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
     907           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     908           2 :     if (PQpipelineSync(conn) != 1)
     909           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     910           2 :     PQsetSingleRowMode(conn);
     911           2 :     goterror = false;
     912           2 :     gotrows = 0;
     913          10 :     while ((res = PQgetResult(conn)) != NULL)
     914             :     {
     915           8 :         switch (PQresultStatus(res))
     916             :         {
     917           6 :             case PGRES_SINGLE_TUPLE:
     918           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     919           6 :                 gotrows++;
     920           6 :                 break;
     921           2 :             case PGRES_FATAL_ERROR:
     922           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     923           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     924             :                              PQerrorMessage(conn),
     925             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     926           2 :                 printf("got expected division-by-zero\n");
     927           2 :                 goterror = true;
     928           2 :                 break;
     929           0 :             default:
     930           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     931             :         }
     932           8 :         PQclear(res);
     933             :     }
     934           2 :     if (!goterror)
     935           0 :         pg_fatal("did not get division-by-zero error");
     936           2 :     if (gotrows != 3)
     937           0 :         pg_fatal("did not get three rows");
     938             :     /* the third pipeline sync */
     939           2 :     if ((res = PQgetResult(conn)) == NULL)
     940           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     941           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     942           0 :         pg_fatal("Unexpected result code %s from third pipeline sync",
     943             :                  PQresStatus(PQresultStatus(res)));
     944           2 :     PQclear(res);
     945             : 
     946             :     /* We're still in pipeline mode... */
     947           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     948           0 :         pg_fatal("Fell out of pipeline mode somehow");
     949             : 
     950             :     /* until we end it, which we can safely do now */
     951           2 :     if (PQexitPipelineMode(conn) != 1)
     952           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     953             :                  PQerrorMessage(conn));
     954             : 
     955           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     956           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     957             : 
     958             :     /*-
     959             :      * Since we fired the pipelines off without a surrounding xact, the results
     960             :      * should be:
     961             :      *
     962             :      * - Implicit xact started by server around 1st pipeline
     963             :      * - First insert applied
     964             :      * - Second statement aborted xact
     965             :      * - Third insert skipped
     966             :      * - Sync rolled back first implicit xact
     967             :      * - Implicit xact created by server around 2nd pipeline
     968             :      * - insert applied from 2nd pipeline
     969             :      * - Sync commits 2nd xact
     970             :      *
     971             :      * So we should only have the value 3 that we inserted.
     972             :      */
     973           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     974             : 
     975           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     976           0 :         pg_fatal("Expected tuples, got %s: %s",
     977             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     978           2 :     if (PQntuples(res) != 1)
     979           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     980           4 :     for (i = 0; i < PQntuples(res); i++)
     981             :     {
     982           2 :         const char *val = PQgetvalue(res, i, 0);
     983             : 
     984           2 :         if (strcmp(val, "3") != 0)
     985           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     986             :     }
     987             : 
     988           2 :     PQclear(res);
     989             : 
     990           2 :     fprintf(stderr, "ok\n");
     991           2 : }
     992             : 
     993             : /* State machine enum for test_pipelined_insert */
     994             : enum PipelineInsertStep
     995             : {
     996             :     BI_BEGIN_TX,
     997             :     BI_DROP_TABLE,
     998             :     BI_CREATE_TABLE,
     999             :     BI_PREPARE,
    1000             :     BI_INSERT_ROWS,
    1001             :     BI_COMMIT_TX,
    1002             :     BI_SYNC,
    1003             :     BI_DONE,
    1004             : };
    1005             : 
    1006             : static void
    1007           2 : test_pipelined_insert(PGconn *conn, int n_rows)
    1008             : {
    1009           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
    1010             :     const char *insert_params[2];
    1011             :     char        insert_param_0[MAXINTLEN];
    1012             :     char        insert_param_1[MAXINT8LEN];
    1013           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
    1014           2 :                 recv_step = BI_BEGIN_TX;
    1015             :     int         rows_to_send,
    1016             :                 rows_to_receive;
    1017             : 
    1018           2 :     insert_params[0] = insert_param_0;
    1019           2 :     insert_params[1] = insert_param_1;
    1020             : 
    1021           2 :     rows_to_send = rows_to_receive = n_rows;
    1022             : 
    1023             :     /*
    1024             :      * Do a pipelined insert into a table created at the start of the pipeline
    1025             :      */
    1026           2 :     if (PQenterPipelineMode(conn) != 1)
    1027           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1028             : 
    1029           8 :     while (send_step != BI_PREPARE)
    1030             :     {
    1031             :         const char *sql;
    1032             : 
    1033           6 :         switch (send_step)
    1034             :         {
    1035           2 :             case BI_BEGIN_TX:
    1036           2 :                 sql = "BEGIN TRANSACTION";
    1037           2 :                 send_step = BI_DROP_TABLE;
    1038           2 :                 break;
    1039             : 
    1040           2 :             case BI_DROP_TABLE:
    1041           2 :                 sql = drop_table_sql;
    1042           2 :                 send_step = BI_CREATE_TABLE;
    1043           2 :                 break;
    1044             : 
    1045           2 :             case BI_CREATE_TABLE:
    1046           2 :                 sql = create_table_sql;
    1047           2 :                 send_step = BI_PREPARE;
    1048           2 :                 break;
    1049             : 
    1050           0 :             default:
    1051           0 :                 pg_fatal("invalid state");
    1052             :                 sql = NULL;     /* keep compiler quiet */
    1053             :         }
    1054             : 
    1055             :         pg_debug("sending: %s\n", sql);
    1056           6 :         if (PQsendQueryParams(conn, sql,
    1057             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
    1058           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
    1059             :     }
    1060             : 
    1061             :     Assert(send_step == BI_PREPARE);
    1062             :     pg_debug("sending: %s\n", insert_sql2);
    1063           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
    1064           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
    1065           2 :     send_step = BI_INSERT_ROWS;
    1066             : 
    1067             :     /*
    1068             :      * Now we start inserting. We'll be sending enough data that we could fill
    1069             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
    1070             :      * mode and consume input while we send more output. As results of each
    1071             :      * query are processed we should pop them to allow processing of the next
    1072             :      * query. There's no need to finish the pipeline before processing
    1073             :      * results.
    1074             :      */
    1075           2 :     if (PQsetnonblocking(conn, 1) != 0)
    1076           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
    1077             : 
    1078       22368 :     while (recv_step != BI_DONE)
    1079             :     {
    1080             :         int         sock;
    1081             :         fd_set      input_mask;
    1082             :         fd_set      output_mask;
    1083             : 
    1084       22366 :         sock = PQsocket(conn);
    1085             : 
    1086       22366 :         if (sock < 0)
    1087           0 :             break;              /* shouldn't happen */
    1088             : 
    1089       22366 :         FD_ZERO(&input_mask);
    1090       22366 :         FD_SET(sock, &input_mask);
    1091       22366 :         FD_ZERO(&output_mask);
    1092       22366 :         FD_SET(sock, &output_mask);
    1093             : 
    1094       22366 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
    1095             :         {
    1096           0 :             fprintf(stderr, "select() failed: %m\n");
    1097           0 :             exit_nicely(conn);
    1098             :         }
    1099             : 
    1100             :         /*
    1101             :          * Process any results, so we keep the server's output buffer free
    1102             :          * flowing and it can continue to process input
    1103             :          */
    1104       22366 :         if (FD_ISSET(sock, &input_mask))
    1105             :         {
    1106           6 :             PQconsumeInput(conn);
    1107             : 
    1108             :             /* Read until we'd block if we tried to read */
    1109        2828 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
    1110             :             {
    1111             :                 PGresult   *res;
    1112        2822 :                 const char *cmdtag = "";
    1113        2822 :                 const char *description = "";
    1114             :                 int         status;
    1115             : 
    1116             :                 /*
    1117             :                  * Read next result.  If no more results from this query,
    1118             :                  * advance to the next query
    1119             :                  */
    1120        2822 :                 res = PQgetResult(conn);
    1121        2822 :                 if (res == NULL)
    1122        1410 :                     continue;
    1123             : 
    1124        1412 :                 status = PGRES_COMMAND_OK;
    1125        1412 :                 switch (recv_step)
    1126             :                 {
    1127           2 :                     case BI_BEGIN_TX:
    1128           2 :                         cmdtag = "BEGIN";
    1129           2 :                         recv_step++;
    1130           2 :                         break;
    1131           2 :                     case BI_DROP_TABLE:
    1132           2 :                         cmdtag = "DROP TABLE";
    1133           2 :                         recv_step++;
    1134           2 :                         break;
    1135           2 :                     case BI_CREATE_TABLE:
    1136           2 :                         cmdtag = "CREATE TABLE";
    1137           2 :                         recv_step++;
    1138           2 :                         break;
    1139           2 :                     case BI_PREPARE:
    1140           2 :                         cmdtag = "";
    1141           2 :                         description = "PREPARE";
    1142           2 :                         recv_step++;
    1143           2 :                         break;
    1144        1400 :                     case BI_INSERT_ROWS:
    1145        1400 :                         cmdtag = "INSERT";
    1146        1400 :                         rows_to_receive--;
    1147        1400 :                         if (rows_to_receive == 0)
    1148           2 :                             recv_step++;
    1149        1400 :                         break;
    1150           2 :                     case BI_COMMIT_TX:
    1151           2 :                         cmdtag = "COMMIT";
    1152           2 :                         recv_step++;
    1153           2 :                         break;
    1154           2 :                     case BI_SYNC:
    1155           2 :                         cmdtag = "";
    1156           2 :                         description = "SYNC";
    1157           2 :                         status = PGRES_PIPELINE_SYNC;
    1158           2 :                         recv_step++;
    1159           2 :                         break;
    1160           0 :                     case BI_DONE:
    1161             :                         /* unreachable */
    1162           0 :                         pg_fatal("unreachable state");
    1163             :                 }
    1164             : 
    1165        1412 :                 if (PQresultStatus(res) != status)
    1166           0 :                     pg_fatal("%s reported status %s, expected %s\n"
    1167             :                              "Error message: \"%s\"",
    1168             :                              description, PQresStatus(PQresultStatus(res)),
    1169             :                              PQresStatus(status), PQerrorMessage(conn));
    1170             : 
    1171        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
    1172           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
    1173             :                              description, cmdtag, PQcmdStatus(res));
    1174             : 
    1175             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
    1176             : 
    1177        1412 :                 PQclear(res);
    1178             :             }
    1179             :         }
    1180             : 
    1181             :         /* Write more rows and/or the end pipeline message, if needed */
    1182       22366 :         if (FD_ISSET(sock, &output_mask))
    1183             :         {
    1184       22364 :             PQflush(conn);
    1185             : 
    1186       22364 :             if (send_step == BI_INSERT_ROWS)
    1187             :             {
    1188        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
    1189             :                 /* use up some buffer space with a wide value */
    1190        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
    1191             : 
    1192        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
    1193             :                                         2, insert_params, NULL, NULL, 0) == 1)
    1194             :                 {
    1195             :                     pg_debug("sent row %d\n", rows_to_send);
    1196             : 
    1197        1400 :                     rows_to_send--;
    1198        1400 :                     if (rows_to_send == 0)
    1199           2 :                         send_step++;
    1200             :                 }
    1201             :                 else
    1202             :                 {
    1203             :                     /*
    1204             :                      * in nonblocking mode, so it's OK for an insert to fail
    1205             :                      * to send
    1206             :                      */
    1207           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
    1208             :                             rows_to_send, PQerrorMessage(conn));
    1209             :                 }
    1210             :             }
    1211       20964 :             else if (send_step == BI_COMMIT_TX)
    1212             :             {
    1213           2 :                 if (PQsendQueryParams(conn, "COMMIT",
    1214             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
    1215             :                 {
    1216             :                     pg_debug("sent COMMIT\n");
    1217           2 :                     send_step++;
    1218             :                 }
    1219             :                 else
    1220             :                 {
    1221           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
    1222             :                             PQerrorMessage(conn));
    1223             :                 }
    1224             :             }
    1225       20962 :             else if (send_step == BI_SYNC)
    1226             :             {
    1227           2 :                 if (PQpipelineSync(conn) == 1)
    1228             :                 {
    1229           2 :                     fprintf(stdout, "pipeline sync sent\n");
    1230           2 :                     send_step++;
    1231             :                 }
    1232             :                 else
    1233             :                 {
    1234           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
    1235             :                             PQerrorMessage(conn));
    1236             :                 }
    1237             :             }
    1238             :         }
    1239             :     }
    1240             : 
    1241             :     /* We've got the sync message and the pipeline should be done */
    1242           2 :     if (PQexitPipelineMode(conn) != 1)
    1243           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1244             :                  PQerrorMessage(conn));
    1245             : 
    1246           2 :     if (PQsetnonblocking(conn, 0) != 0)
    1247           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
    1248             : 
    1249           2 :     fprintf(stderr, "ok\n");
    1250           2 : }
    1251             : 
    1252             : static void
    1253           2 : test_prepared(PGconn *conn)
    1254             : {
    1255           2 :     PGresult   *res = NULL;
    1256           2 :     Oid         param_oids[1] = {INT4OID};
    1257             :     Oid         expected_oids[4];
    1258             :     Oid         typ;
    1259             : 
    1260           2 :     fprintf(stderr, "prepared... ");
    1261             : 
    1262           2 :     if (PQenterPipelineMode(conn) != 1)
    1263           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1264           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
    1265             :                       "interval '1 sec'",
    1266             :                       1, param_oids) != 1)
    1267           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
    1268           2 :     expected_oids[0] = INT4OID;
    1269           2 :     expected_oids[1] = TEXTOID;
    1270           2 :     expected_oids[2] = NUMERICOID;
    1271           2 :     expected_oids[3] = INTERVALOID;
    1272           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
    1273           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
    1274           2 :     if (PQpipelineSync(conn) != 1)
    1275           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1276             : 
    1277           2 :     res = PQgetResult(conn);
    1278           2 :     if (res == NULL)
    1279           0 :         pg_fatal("PQgetResult returned null");
    1280           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1281           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1282           2 :     PQclear(res);
    1283           2 :     res = PQgetResult(conn);
    1284           2 :     if (res != NULL)
    1285           0 :         pg_fatal("expected NULL result");
    1286             : 
    1287           2 :     res = PQgetResult(conn);
    1288           2 :     if (res == NULL)
    1289           0 :         pg_fatal("PQgetResult returned NULL");
    1290           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1291           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1292           2 :     if (PQnfields(res) != lengthof(expected_oids))
    1293           0 :         pg_fatal("expected %zu columns, got %d",
    1294             :                  lengthof(expected_oids), PQnfields(res));
    1295          10 :     for (int i = 0; i < PQnfields(res); i++)
    1296             :     {
    1297           8 :         typ = PQftype(res, i);
    1298           8 :         if (typ != expected_oids[i])
    1299           0 :             pg_fatal("field %d: expected type %u, got %u",
    1300             :                      i, expected_oids[i], typ);
    1301             :     }
    1302           2 :     PQclear(res);
    1303           2 :     res = PQgetResult(conn);
    1304           2 :     if (res != NULL)
    1305           0 :         pg_fatal("expected NULL result");
    1306             : 
    1307           2 :     res = PQgetResult(conn);
    1308           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1309           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1310             : 
    1311           2 :     fprintf(stderr, "closing statement..");
    1312           2 :     if (PQsendClosePrepared(conn, "select_one") != 1)
    1313           0 :         pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
    1314           2 :     if (PQpipelineSync(conn) != 1)
    1315           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1316             : 
    1317           2 :     res = PQgetResult(conn);
    1318           2 :     if (res == NULL)
    1319           0 :         pg_fatal("expected non-NULL result");
    1320           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1321           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1322           2 :     PQclear(res);
    1323           2 :     res = PQgetResult(conn);
    1324           2 :     if (res != NULL)
    1325           0 :         pg_fatal("expected NULL result");
    1326           2 :     res = PQgetResult(conn);
    1327           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1328           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1329             : 
    1330           2 :     if (PQexitPipelineMode(conn) != 1)
    1331           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1332             : 
    1333             :     /* Now that it's closed we should get an error when describing */
    1334           2 :     res = PQdescribePrepared(conn, "select_one");
    1335           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1336           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1337             : 
    1338             :     /*
    1339             :      * Also test the blocking close, this should not fail since closing a
    1340             :      * non-existent prepared statement is a no-op
    1341             :      */
    1342           2 :     res = PQclosePrepared(conn, "select_one");
    1343           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1344           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1345             : 
    1346           2 :     fprintf(stderr, "creating portal... ");
    1347           2 :     PQexec(conn, "BEGIN");
    1348           2 :     PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1349           2 :     PQenterPipelineMode(conn);
    1350           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1351           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1352           2 :     if (PQpipelineSync(conn) != 1)
    1353           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1354           2 :     res = PQgetResult(conn);
    1355           2 :     if (res == NULL)
    1356           0 :         pg_fatal("PQgetResult returned null");
    1357           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1358           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1359             : 
    1360           2 :     typ = PQftype(res, 0);
    1361           2 :     if (typ != INT4OID)
    1362           0 :         pg_fatal("portal: expected type %u, got %u",
    1363             :                  INT4OID, typ);
    1364           2 :     PQclear(res);
    1365           2 :     res = PQgetResult(conn);
    1366           2 :     if (res != NULL)
    1367           0 :         pg_fatal("expected NULL result");
    1368           2 :     res = PQgetResult(conn);
    1369           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1370           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1371             : 
    1372           2 :     fprintf(stderr, "closing portal... ");
    1373           2 :     if (PQsendClosePortal(conn, "cursor_one") != 1)
    1374           0 :         pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1375           2 :     if (PQpipelineSync(conn) != 1)
    1376           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1377             : 
    1378           2 :     res = PQgetResult(conn);
    1379           2 :     if (res == NULL)
    1380           0 :         pg_fatal("expected non-NULL result");
    1381           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1382           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1383           2 :     PQclear(res);
    1384           2 :     res = PQgetResult(conn);
    1385           2 :     if (res != NULL)
    1386           0 :         pg_fatal("expected NULL result");
    1387           2 :     res = PQgetResult(conn);
    1388           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1389           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1390             : 
    1391           2 :     if (PQexitPipelineMode(conn) != 1)
    1392           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1393             : 
    1394             :     /* Now that it's closed we should get an error when describing */
    1395           2 :     res = PQdescribePortal(conn, "cursor_one");
    1396           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1397           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1398             : 
    1399             :     /*
    1400             :      * Also test the blocking close, this should not fail since closing a
    1401             :      * non-existent portal is a no-op
    1402             :      */
    1403           2 :     res = PQclosePortal(conn, "cursor_one");
    1404           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1405           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1406             : 
    1407           2 :     fprintf(stderr, "ok\n");
    1408           2 : }
    1409             : 
    1410             : /*
    1411             :  * Test max_protocol_version options.
    1412             :  */
    1413             : static void
    1414           2 : test_protocol_version(PGconn *conn)
    1415             : {
    1416             :     const char **keywords;
    1417             :     const char **vals;
    1418             :     int         nopts;
    1419           2 :     PQconninfoOption *opts = PQconninfo(conn);
    1420             :     int         protocol_version;
    1421             :     int         max_protocol_version_index;
    1422             :     int         i;
    1423             : 
    1424             :     /*
    1425             :      * Prepare keywords/vals arrays, copied from the existing connection, with
    1426             :      * an extra slot for 'max_protocol_version'.
    1427             :      */
    1428           2 :     nopts = 0;
    1429         102 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1430         100 :         nopts++;
    1431           2 :     nopts++;                    /* max_protocol_version */
    1432           2 :     nopts++;                    /* NULL terminator */
    1433             : 
    1434           2 :     keywords = pg_malloc0(sizeof(char *) * nopts);
    1435           2 :     vals = pg_malloc0(sizeof(char *) * nopts);
    1436             : 
    1437           2 :     i = 0;
    1438         102 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
    1439             :     {
    1440         100 :         if (opt->val)
    1441             :         {
    1442          40 :             keywords[i] = opt->keyword;
    1443          40 :             vals[i] = opt->val;
    1444          40 :             i++;
    1445             :         }
    1446             :     }
    1447             : 
    1448           2 :     max_protocol_version_index = i;
    1449           2 :     keywords[i] = "max_protocol_version"; /* value is filled in below */
    1450           2 :     i++;
    1451           2 :     keywords[i] = vals[i] = NULL;
    1452             : 
    1453             :     /*
    1454             :      * Test max_protocol_version=3.0
    1455             :      */
    1456           2 :     vals[max_protocol_version_index] = "3.0";
    1457           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1458             : 
    1459           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1460           0 :         pg_fatal("Connection to database failed: %s",
    1461             :                  PQerrorMessage(conn));
    1462             : 
    1463           2 :     protocol_version = PQfullProtocolVersion(conn);
    1464           2 :     if (protocol_version != 30000)
    1465           0 :         pg_fatal("expected 30000, got %d", protocol_version);
    1466             : 
    1467           2 :     PQfinish(conn);
    1468             : 
    1469             :     /*
    1470             :      * Test max_protocol_version=3.1. It's not valid, we went straight from
    1471             :      * 3.0 to 3.2.
    1472             :      */
    1473           2 :     vals[max_protocol_version_index] = "3.1";
    1474           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1475             : 
    1476           2 :     if (PQstatus(conn) != CONNECTION_BAD)
    1477           0 :         pg_fatal("Connecting with max_protocol_version 3.1 should have failed.");
    1478             : 
    1479           2 :     PQfinish(conn);
    1480             : 
    1481             :     /*
    1482             :      * Test max_protocol_version=3.2
    1483             :      */
    1484           2 :     vals[max_protocol_version_index] = "3.2";
    1485           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1486             : 
    1487           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1488           0 :         pg_fatal("Connection to database failed: %s",
    1489             :                  PQerrorMessage(conn));
    1490             : 
    1491           2 :     protocol_version = PQfullProtocolVersion(conn);
    1492           2 :     if (protocol_version != 30002)
    1493           0 :         pg_fatal("expected 30002, got %d", protocol_version);
    1494             : 
    1495           2 :     PQfinish(conn);
    1496             : 
    1497             :     /*
    1498             :      * Test max_protocol_version=latest. 'latest' currently means '3.2'.
    1499             :      */
    1500           2 :     vals[max_protocol_version_index] = "latest";
    1501           2 :     conn = PQconnectdbParams(keywords, vals, false);
    1502             : 
    1503           2 :     if (PQstatus(conn) != CONNECTION_OK)
    1504           0 :         pg_fatal("Connection to database failed: %s",
    1505             :                  PQerrorMessage(conn));
    1506             : 
    1507           2 :     protocol_version = PQfullProtocolVersion(conn);
    1508           2 :     if (protocol_version != 30002)
    1509           0 :         pg_fatal("expected 30002, got %d", protocol_version);
    1510             : 
    1511           2 :     PQfinish(conn);
    1512           2 : }
    1513             : 
    1514             : /* Notice processor: print notices, and count how many we got */
    1515             : static void
    1516           2 : notice_processor(void *arg, const char *message)
    1517             : {
    1518           2 :     int        *n_notices = (int *) arg;
    1519             : 
    1520           2 :     (*n_notices)++;
    1521           2 :     fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1522           2 : }
    1523             : 
    1524             : /* Verify behavior in "idle" state */
    1525             : static void
    1526           2 : test_pipeline_idle(PGconn *conn)
    1527             : {
    1528             :     PGresult   *res;
    1529           2 :     int         n_notices = 0;
    1530             : 
    1531           2 :     fprintf(stderr, "\npipeline idle...\n");
    1532             : 
    1533           2 :     PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1534             : 
    1535             :     /* Try to exit pipeline mode in pipeline-idle state */
    1536           2 :     if (PQenterPipelineMode(conn) != 1)
    1537           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1538           2 :     if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1539           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1540           2 :     PQsendFlushRequest(conn);
    1541           2 :     res = PQgetResult(conn);
    1542           2 :     if (res == NULL)
    1543           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1544             :                  PQerrorMessage(conn));
    1545           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1546           0 :         pg_fatal("unexpected result code %s from first pipeline item",
    1547             :                  PQresStatus(PQresultStatus(res)));
    1548           2 :     PQclear(res);
    1549           2 :     res = PQgetResult(conn);
    1550           2 :     if (res != NULL)
    1551           0 :         pg_fatal("did not receive terminating NULL");
    1552           2 :     if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1553           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1554           2 :     if (PQexitPipelineMode(conn) == 1)
    1555           0 :         pg_fatal("exiting pipeline succeeded when it shouldn't");
    1556           2 :     if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1557             :                 strlen("cannot exit pipeline mode")) != 0)
    1558           0 :         pg_fatal("did not get expected error; got: %s",
    1559             :                  PQerrorMessage(conn));
    1560           2 :     PQsendFlushRequest(conn);
    1561           2 :     res = PQgetResult(conn);
    1562           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1563           0 :         pg_fatal("unexpected result code %s from second pipeline item",
    1564             :                  PQresStatus(PQresultStatus(res)));
    1565           2 :     PQclear(res);
    1566           2 :     res = PQgetResult(conn);
    1567           2 :     if (res != NULL)
    1568           0 :         pg_fatal("did not receive terminating NULL");
    1569           2 :     if (PQexitPipelineMode(conn) != 1)
    1570           0 :         pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1571             : 
    1572           2 :     if (n_notices > 0)
    1573           0 :         pg_fatal("got %d notice(s)", n_notices);
    1574           2 :     fprintf(stderr, "ok - 1\n");
    1575             : 
    1576             :     /* Have a WARNING in the middle of a resultset */
    1577           2 :     if (PQenterPipelineMode(conn) != 1)
    1578           0 :         pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1579           2 :     if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1580           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1581           2 :     PQsendFlushRequest(conn);
    1582           2 :     res = PQgetResult(conn);
    1583           2 :     if (res == NULL)
    1584           0 :         pg_fatal("unexpected NULL result received");
    1585           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1586           0 :         pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
    1587           2 :     if (PQexitPipelineMode(conn) != 1)
    1588           0 :         pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1589           2 :     fprintf(stderr, "ok - 2\n");
    1590           2 : }
    1591             : 
    1592             : static void
    1593           2 : test_simple_pipeline(PGconn *conn)
    1594             : {
    1595           2 :     PGresult   *res = NULL;
    1596           2 :     const char *dummy_params[1] = {"1"};
    1597           2 :     Oid         dummy_param_oids[1] = {INT4OID};
    1598             : 
    1599           2 :     fprintf(stderr, "simple pipeline... ");
    1600             : 
    1601             :     /*
    1602             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
    1603             :      * process the results of as they come in.
    1604             :      *
    1605             :      * For a simple case we should be able to do this without interim
    1606             :      * processing of results since our output buffer will give us enough slush
    1607             :      * to work with and we won't block on sending. So blocking mode is fine.
    1608             :      */
    1609           2 :     if (PQisnonblocking(conn))
    1610           0 :         pg_fatal("Expected blocking connection mode");
    1611             : 
    1612           2 :     if (PQenterPipelineMode(conn) != 1)
    1613           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1614             : 
    1615           2 :     if (PQsendQueryParams(conn, "SELECT $1",
    1616             :                           1, dummy_param_oids, dummy_params,
    1617             :                           NULL, NULL, 0) != 1)
    1618           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1619             : 
    1620           2 :     if (PQexitPipelineMode(conn) != 0)
    1621           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1622             : 
    1623           2 :     if (PQpipelineSync(conn) != 1)
    1624           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1625             : 
    1626           2 :     res = PQgetResult(conn);
    1627           2 :     if (res == NULL)
    1628           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1629             :                  PQerrorMessage(conn));
    1630             : 
    1631           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1632           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
    1633             :                  PQresStatus(PQresultStatus(res)));
    1634             : 
    1635           2 :     PQclear(res);
    1636           2 :     res = NULL;
    1637             : 
    1638           2 :     if (PQgetResult(conn) != NULL)
    1639           0 :         pg_fatal("PQgetResult returned something extra after first query result.");
    1640             : 
    1641             :     /*
    1642             :      * Even though we've processed the result there's still a sync to come and
    1643             :      * we can't exit pipeline mode yet
    1644             :      */
    1645           2 :     if (PQexitPipelineMode(conn) != 0)
    1646           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1647             : 
    1648           2 :     res = PQgetResult(conn);
    1649           2 :     if (res == NULL)
    1650           0 :         pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
    1651             :                  PQerrorMessage(conn));
    1652             : 
    1653           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1654           0 :         pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
    1655             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
    1656             : 
    1657           2 :     PQclear(res);
    1658           2 :     res = NULL;
    1659             : 
    1660           2 :     if (PQgetResult(conn) != NULL)
    1661           0 :         pg_fatal("PQgetResult returned something extra after pipeline end: %s",
    1662             :                  PQresStatus(PQresultStatus(res)));
    1663             : 
    1664             :     /* We're still in pipeline mode... */
    1665           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1666           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1667             : 
    1668             :     /* ... until we end it, which we can safely do now */
    1669           2 :     if (PQexitPipelineMode(conn) != 1)
    1670           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1671             :                  PQerrorMessage(conn));
    1672             : 
    1673           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1674           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1675             : 
    1676           2 :     fprintf(stderr, "ok\n");
    1677           2 : }
    1678             : 
    1679             : static void
    1680           2 : test_singlerowmode(PGconn *conn)
    1681             : {
    1682             :     PGresult   *res;
    1683             :     int         i;
    1684           2 :     bool        pipeline_ended = false;
    1685             : 
    1686           2 :     if (PQenterPipelineMode(conn) != 1)
    1687           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1688             :                  PQerrorMessage(conn));
    1689             : 
    1690             :     /* One series of three commands, using single-row mode for the first two. */
    1691           8 :     for (i = 0; i < 3; i++)
    1692             :     {
    1693             :         char       *param[1];
    1694             : 
    1695           6 :         param[0] = psprintf("%d", 44 + i);
    1696             : 
    1697           6 :         if (PQsendQueryParams(conn,
    1698             :                               "SELECT generate_series(42, $1)",
    1699             :                               1,
    1700             :                               NULL,
    1701             :                               (const char **) param,
    1702             :                               NULL,
    1703             :                               NULL,
    1704             :                               0) != 1)
    1705           0 :             pg_fatal("failed to send query: %s",
    1706             :                      PQerrorMessage(conn));
    1707           6 :         pfree(param[0]);
    1708             :     }
    1709           2 :     if (PQpipelineSync(conn) != 1)
    1710           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1711             : 
    1712          10 :     for (i = 0; !pipeline_ended; i++)
    1713             :     {
    1714           8 :         bool        first = true;
    1715             :         bool        saw_ending_tuplesok;
    1716           8 :         bool        isSingleTuple = false;
    1717             : 
    1718             :         /* Set single row mode for only first 2 SELECT queries */
    1719           8 :         if (i < 2)
    1720             :         {
    1721           4 :             if (PQsetSingleRowMode(conn) != 1)
    1722           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1723             :         }
    1724             : 
    1725             :         /* Consume rows for this query */
    1726           8 :         saw_ending_tuplesok = false;
    1727          28 :         while ((res = PQgetResult(conn)) != NULL)
    1728             :         {
    1729          22 :             ExecStatusType est = PQresultStatus(res);
    1730             : 
    1731          22 :             if (est == PGRES_PIPELINE_SYNC)
    1732             :             {
    1733           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1734           2 :                 pipeline_ended = true;
    1735           2 :                 PQclear(res);
    1736           2 :                 if (i != 3)
    1737           0 :                     pg_fatal("Expected three results, got %d", i);
    1738           2 :                 break;
    1739             :             }
    1740             : 
    1741             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1742          20 :             if (first)
    1743             :             {
    1744           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1745           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1746             :                              i, PQresStatus(est));
    1747           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1748           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1749             :                              i, PQresStatus(est));
    1750           6 :                 first = false;
    1751             :             }
    1752             : 
    1753          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1754          20 :             switch (est)
    1755             :             {
    1756           6 :                 case PGRES_TUPLES_OK:
    1757           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1758           6 :                     saw_ending_tuplesok = true;
    1759           6 :                     if (isSingleTuple)
    1760             :                     {
    1761           4 :                         if (PQntuples(res) == 0)
    1762           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1763             :                         else
    1764           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1765             :                     }
    1766           6 :                     break;
    1767             : 
    1768          14 :                 case PGRES_SINGLE_TUPLE:
    1769          14 :                     isSingleTuple = true;
    1770          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1771          14 :                     break;
    1772             : 
    1773           0 :                 default:
    1774           0 :                     pg_fatal("unexpected");
    1775             :             }
    1776          20 :             PQclear(res);
    1777             :         }
    1778           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1779           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1780             :     }
    1781             : 
    1782             :     /*
    1783             :      * Now issue one command, get its results in with single-row mode, then
    1784             :      * issue another command, and get its results in normal mode; make sure
    1785             :      * the single-row mode flag is reset as expected.
    1786             :      */
    1787           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1788             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1789           0 :         pg_fatal("failed to send query: %s",
    1790             :                  PQerrorMessage(conn));
    1791           2 :     if (PQsendFlushRequest(conn) != 1)
    1792           0 :         pg_fatal("failed to send flush request");
    1793           2 :     if (PQsetSingleRowMode(conn) != 1)
    1794           0 :         pg_fatal("PQsetSingleRowMode() failed");
    1795           2 :     res = PQgetResult(conn);
    1796           2 :     if (res == NULL)
    1797           0 :         pg_fatal("unexpected NULL");
    1798           2 :     if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
    1799           0 :         pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
    1800             :                  PQresStatus(PQresultStatus(res)));
    1801           2 :     res = PQgetResult(conn);
    1802           2 :     if (res == NULL)
    1803           0 :         pg_fatal("unexpected NULL");
    1804           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1805           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1806             :                  PQresStatus(PQresultStatus(res)));
    1807           2 :     if (PQgetResult(conn) != NULL)
    1808           0 :         pg_fatal("expected NULL result");
    1809             : 
    1810           2 :     if (PQsendQueryParams(conn, "SELECT 1",
    1811             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1812           0 :         pg_fatal("failed to send query: %s",
    1813             :                  PQerrorMessage(conn));
    1814           2 :     if (PQsendFlushRequest(conn) != 1)
    1815           0 :         pg_fatal("failed to send flush request");
    1816           2 :     res = PQgetResult(conn);
    1817           2 :     if (res == NULL)
    1818           0 :         pg_fatal("unexpected NULL");
    1819           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1820           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1821             :                  PQresStatus(PQresultStatus(res)));
    1822           2 :     if (PQgetResult(conn) != NULL)
    1823           0 :         pg_fatal("expected NULL result");
    1824             : 
    1825             :     /*
    1826             :      * Try chunked mode as well; make sure that it correctly delivers a
    1827             :      * partial final chunk.
    1828             :      */
    1829           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
    1830             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1831           0 :         pg_fatal("failed to send query: %s",
    1832             :                  PQerrorMessage(conn));
    1833           2 :     if (PQsendFlushRequest(conn) != 1)
    1834           0 :         pg_fatal("failed to send flush request");
    1835           2 :     if (PQsetChunkedRowsMode(conn, 3) != 1)
    1836           0 :         pg_fatal("PQsetChunkedRowsMode() failed");
    1837           2 :     res = PQgetResult(conn);
    1838           2 :     if (res == NULL)
    1839           0 :         pg_fatal("unexpected NULL");
    1840           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1841           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
    1842             :                  PQresStatus(PQresultStatus(res)),
    1843             :                  PQerrorMessage(conn));
    1844           2 :     if (PQntuples(res) != 3)
    1845           0 :         pg_fatal("Expected 3 rows, got %d", PQntuples(res));
    1846           2 :     res = PQgetResult(conn);
    1847           2 :     if (res == NULL)
    1848           0 :         pg_fatal("unexpected NULL");
    1849           2 :     if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
    1850           0 :         pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
    1851             :                  PQresStatus(PQresultStatus(res)));
    1852           2 :     if (PQntuples(res) != 2)
    1853           0 :         pg_fatal("Expected 2 rows, got %d", PQntuples(res));
    1854           2 :     res = PQgetResult(conn);
    1855           2 :     if (res == NULL)
    1856           0 :         pg_fatal("unexpected NULL");
    1857           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1858           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1859             :                  PQresStatus(PQresultStatus(res)));
    1860           2 :     if (PQntuples(res) != 0)
    1861           0 :         pg_fatal("Expected 0 rows, got %d", PQntuples(res));
    1862           2 :     if (PQgetResult(conn) != NULL)
    1863           0 :         pg_fatal("expected NULL result");
    1864             : 
    1865           2 :     if (PQexitPipelineMode(conn) != 1)
    1866           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1867             : 
    1868           2 :     fprintf(stderr, "ok\n");
    1869           2 : }
    1870             : 
    1871             : /*
    1872             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1873             :  * an error, ignoring transaction commands.
    1874             :  */
    1875             : static void
    1876           2 : test_transaction(PGconn *conn)
    1877             : {
    1878             :     PGresult   *res;
    1879             :     bool        expect_null;
    1880           2 :     int         num_syncs = 0;
    1881             : 
    1882           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1883             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1884           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1885           0 :         pg_fatal("failed to create test table: %s",
    1886             :                  PQerrorMessage(conn));
    1887           2 :     PQclear(res);
    1888             : 
    1889           2 :     if (PQenterPipelineMode(conn) != 1)
    1890           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1891             :                  PQerrorMessage(conn));
    1892           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1893           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1894             :                  PQerrorMessage(conn));
    1895             : 
    1896           2 :     if (PQsendQueryParams(conn,
    1897             :                           "BEGIN",
    1898             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1899           0 :         pg_fatal("failed to send query: %s",
    1900             :                  PQerrorMessage(conn));
    1901           2 :     if (PQsendQueryParams(conn,
    1902             :                           "SELECT 0/0",
    1903             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1904           0 :         pg_fatal("failed to send query: %s",
    1905             :                  PQerrorMessage(conn));
    1906             : 
    1907             :     /*
    1908             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1909             :      * get out of the pipeline-aborted state first.
    1910             :      */
    1911           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1912           0 :         pg_fatal("failed to execute prepared: %s",
    1913             :                  PQerrorMessage(conn));
    1914             : 
    1915             :     /* This insert fails because we're in pipeline-aborted state */
    1916           2 :     if (PQsendQueryParams(conn,
    1917             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1918             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1919           0 :         pg_fatal("failed to send query: %s",
    1920             :                  PQerrorMessage(conn));
    1921           2 :     if (PQpipelineSync(conn) != 1)
    1922           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1923           2 :     num_syncs++;
    1924             : 
    1925             :     /*
    1926             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1927             :      * an aborted transaction
    1928             :      */
    1929           2 :     if (PQsendQueryParams(conn,
    1930             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1931             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1932           0 :         pg_fatal("failed to send query: %s",
    1933             :                  PQerrorMessage(conn));
    1934           2 :     if (PQpipelineSync(conn) != 1)
    1935           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1936           2 :     num_syncs++;
    1937             : 
    1938             :     /*
    1939             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1940             :      * PQpipelineSync above.
    1941             :      */
    1942           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1943           0 :         pg_fatal("failed to execute prepared: %s",
    1944             :                  PQerrorMessage(conn));
    1945             : 
    1946             :     /*
    1947             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1948             :      * insert works
    1949             :      */
    1950           2 :     if (PQsendQueryParams(conn,
    1951             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1952             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1953           0 :         pg_fatal("failed to send query: %s",
    1954             :                  PQerrorMessage(conn));
    1955             :     /* Send two syncs now -- match up to SYNC messages below */
    1956           2 :     if (PQpipelineSync(conn) != 1)
    1957           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1958           2 :     num_syncs++;
    1959           2 :     if (PQpipelineSync(conn) != 1)
    1960           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1961           2 :     num_syncs++;
    1962             : 
    1963           2 :     expect_null = false;
    1964           2 :     for (int i = 0;; i++)
    1965          38 :     {
    1966             :         ExecStatusType restype;
    1967             : 
    1968          40 :         res = PQgetResult(conn);
    1969          40 :         if (res == NULL)
    1970             :         {
    1971          16 :             printf("%d: got NULL result\n", i);
    1972          16 :             if (!expect_null)
    1973           0 :                 pg_fatal("did not expect NULL here");
    1974          16 :             expect_null = false;
    1975          16 :             continue;
    1976             :         }
    1977          24 :         restype = PQresultStatus(res);
    1978          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1979          24 :         if (expect_null)
    1980           0 :             pg_fatal("expected NULL");
    1981          24 :         if (restype == PGRES_FATAL_ERROR)
    1982           4 :             printf("; error: %s", PQerrorMessage(conn));
    1983          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1984             :         {
    1985           4 :             printf(": command didn't run because pipeline aborted\n");
    1986             :         }
    1987             :         else
    1988          16 :             printf("\n");
    1989          24 :         PQclear(res);
    1990             : 
    1991          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1992           8 :             num_syncs--;
    1993             :         else
    1994          16 :             expect_null = true;
    1995          24 :         if (num_syncs <= 0)
    1996           2 :             break;
    1997             :     }
    1998           2 :     if (PQgetResult(conn) != NULL)
    1999           0 :         pg_fatal("returned something extra after all the syncs: %s",
    2000             :                  PQresStatus(PQresultStatus(res)));
    2001             : 
    2002           2 :     if (PQexitPipelineMode(conn) != 1)
    2003           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    2004             : 
    2005             :     /* We expect to find one tuple containing the value "3" */
    2006           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    2007           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    2008           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    2009           2 :     if (PQntuples(res) != 1)
    2010           0 :         pg_fatal("did not get 1 tuple");
    2011           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    2012           0 :         pg_fatal("did not get expected tuple");
    2013           2 :     PQclear(res);
    2014             : 
    2015           2 :     fprintf(stderr, "ok\n");
    2016           2 : }
    2017             : 
    2018             : /*
    2019             :  * In this test mode we send a stream of queries, with one in the middle
    2020             :  * causing an error.  Verify that we can still send some more after the
    2021             :  * error and have libpq work properly.
    2022             :  */
    2023             : static void
    2024           2 : test_uniqviol(PGconn *conn)
    2025             : {
    2026           2 :     int         sock = PQsocket(conn);
    2027             :     PGresult   *res;
    2028           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    2029             :     const char *paramValues[2];
    2030             :     char        paramValue0[MAXINT8LEN];
    2031             :     char        paramValue1[MAXINT8LEN];
    2032           2 :     int         ctr = 0;
    2033           2 :     int         numsent = 0;
    2034           2 :     int         results = 0;
    2035           2 :     bool        read_done = false;
    2036           2 :     bool        write_done = false;
    2037           2 :     bool        error_sent = false;
    2038           2 :     bool        got_error = false;
    2039           2 :     int         switched = 0;
    2040           2 :     int         socketful = 0;
    2041             :     fd_set      in_fds;
    2042             :     fd_set      out_fds;
    2043             : 
    2044           2 :     fprintf(stderr, "uniqviol ...");
    2045             : 
    2046           2 :     PQsetnonblocking(conn, 1);
    2047             : 
    2048           2 :     paramValues[0] = paramValue0;
    2049           2 :     paramValues[1] = paramValue1;
    2050           2 :     sprintf(paramValue1, "42");
    2051             : 
    2052           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    2053             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    2054           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2055           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    2056             : 
    2057           2 :     res = PQexec(conn, "begin");
    2058           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2059           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    2060             : 
    2061           2 :     res = PQprepare(conn, "insertion",
    2062             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    2063             :                     2, paramTypes);
    2064           2 :     if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
    2065           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    2066             : 
    2067           2 :     if (PQenterPipelineMode(conn) != 1)
    2068           0 :         pg_fatal("failed to enter pipeline mode");
    2069             : 
    2070          16 :     while (!read_done)
    2071             :     {
    2072             :         /*
    2073             :          * Avoid deadlocks by reading everything the server has sent before
    2074             :          * sending anything.  (Special precaution is needed here to process
    2075             :          * PQisBusy before testing the socket for read-readiness, because the
    2076             :          * socket does not turn read-ready after "sending" queries in aborted
    2077             :          * pipeline mode.)
    2078             :          */
    2079        1212 :         while (PQisBusy(conn) == 0)
    2080             :         {
    2081             :             bool        new_error;
    2082             : 
    2083        1202 :             if (results >= numsent)
    2084             :             {
    2085           2 :                 if (write_done)
    2086           0 :                     read_done = true;
    2087           2 :                 break;
    2088             :             }
    2089             : 
    2090        1200 :             res = PQgetResult(conn);
    2091        1200 :             new_error = process_result(conn, res, results, numsent);
    2092        1200 :             if (new_error && got_error)
    2093           0 :                 pg_fatal("got two errors");
    2094        1200 :             got_error |= new_error;
    2095        1200 :             if (results++ >= numsent - 1)
    2096             :             {
    2097           4 :                 if (write_done)
    2098           2 :                     read_done = true;
    2099           4 :                 break;
    2100             :             }
    2101             :         }
    2102             : 
    2103          16 :         if (read_done)
    2104           2 :             break;
    2105             : 
    2106          14 :         FD_ZERO(&out_fds);
    2107          14 :         FD_SET(sock, &out_fds);
    2108             : 
    2109          14 :         FD_ZERO(&in_fds);
    2110          14 :         FD_SET(sock, &in_fds);
    2111             : 
    2112          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    2113             :         {
    2114           0 :             if (errno == EINTR)
    2115           0 :                 continue;
    2116           0 :             pg_fatal("select() failed: %m");
    2117             :         }
    2118             : 
    2119          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    2120           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    2121             : 
    2122             :         /*
    2123             :          * If the socket is writable and we haven't finished sending queries,
    2124             :          * send some.
    2125             :          */
    2126          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    2127             :         {
    2128             :             for (;;)
    2129        1194 :             {
    2130             :                 int         flush;
    2131             : 
    2132             :                 /*
    2133             :                  * provoke uniqueness violation exactly once after having
    2134             :                  * switched to read mode.
    2135             :                  */
    2136        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    2137             :                 {
    2138           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    2139           2 :                     fprintf(stderr, "E");
    2140           2 :                     error_sent = true;
    2141             :                 }
    2142             :                 else
    2143             :                 {
    2144        1198 :                     fprintf(stderr, ".");
    2145        1198 :                     sprintf(paramValue0, "%d", ctr++);
    2146             :                 }
    2147             : 
    2148        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    2149           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    2150        1200 :                 numsent++;
    2151             : 
    2152             :                 /* Are we done writing? */
    2153        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    2154             :                 {
    2155           2 :                     if (PQsendFlushRequest(conn) != 1)
    2156           0 :                         pg_fatal("failed to send flush request");
    2157           2 :                     write_done = true;
    2158           2 :                     fprintf(stderr, "\ndone writing\n");
    2159           2 :                     PQflush(conn);
    2160           2 :                     break;
    2161             :                 }
    2162             : 
    2163             :                 /* is the outgoing socket full? */
    2164        1198 :                 flush = PQflush(conn);
    2165        1198 :                 if (flush == -1)
    2166           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    2167        1198 :                 if (flush == 1)
    2168             :                 {
    2169           4 :                     if (socketful == 0)
    2170           2 :                         socketful = numsent;
    2171           4 :                     fprintf(stderr, "\nswitch to reading\n");
    2172           4 :                     switched++;
    2173           4 :                     break;
    2174             :                 }
    2175             :             }
    2176             :         }
    2177             :     }
    2178             : 
    2179           2 :     if (!got_error)
    2180           0 :         pg_fatal("did not get expected error");
    2181             : 
    2182           2 :     fprintf(stderr, "ok\n");
    2183           2 : }
    2184             : 
    2185             : /*
    2186             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    2187             :  * the expected NULL that should follow it.
    2188             :  *
    2189             :  * Returns true if we read a fatal error message, otherwise false.
    2190             :  */
    2191             : static bool
    2192        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    2193             : {
    2194             :     PGresult   *res2;
    2195        1200 :     bool        got_error = false;
    2196             : 
    2197        1200 :     if (res == NULL)
    2198           0 :         pg_fatal("got unexpected NULL");
    2199             : 
    2200        1200 :     switch (PQresultStatus(res))
    2201             :     {
    2202           2 :         case PGRES_FATAL_ERROR:
    2203           2 :             got_error = true;
    2204           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    2205           2 :             PQclear(res);
    2206             : 
    2207           2 :             res2 = PQgetResult(conn);
    2208           2 :             if (res2 != NULL)
    2209           0 :                 pg_fatal("expected NULL, got %s",
    2210             :                          PQresStatus(PQresultStatus(res2)));
    2211           2 :             break;
    2212             : 
    2213         836 :         case PGRES_TUPLES_OK:
    2214         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    2215         836 :             PQclear(res);
    2216             : 
    2217         836 :             res2 = PQgetResult(conn);
    2218         836 :             if (res2 != NULL)
    2219           0 :                 pg_fatal("expected NULL, got %s",
    2220             :                          PQresStatus(PQresultStatus(res2)));
    2221         836 :             break;
    2222             : 
    2223         362 :         case PGRES_PIPELINE_ABORTED:
    2224         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    2225         362 :             res2 = PQgetResult(conn);
    2226         362 :             if (res2 != NULL)
    2227           0 :                 pg_fatal("expected NULL, got %s",
    2228             :                          PQresStatus(PQresultStatus(res2)));
    2229         362 :             break;
    2230             : 
    2231           0 :         default:
    2232           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    2233             :     }
    2234             : 
    2235        1200 :     return got_error;
    2236             : }
    2237             : 
    2238             : 
    2239             : static void
    2240           0 : usage(const char *progname)
    2241             : {
    2242           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    2243           0 :     fprintf(stderr, "Usage:\n");
    2244           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    2245           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    2246           0 :     fprintf(stderr, "\nOptions:\n");
    2247           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    2248           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    2249           0 : }
    2250             : 
    2251             : static void
    2252           2 : print_test_list(void)
    2253             : {
    2254           2 :     printf("cancel\n");
    2255           2 :     printf("disallowed_in_pipeline\n");
    2256           2 :     printf("multi_pipelines\n");
    2257           2 :     printf("nosync\n");
    2258           2 :     printf("pipeline_abort\n");
    2259           2 :     printf("pipeline_idle\n");
    2260           2 :     printf("pipelined_insert\n");
    2261           2 :     printf("prepared\n");
    2262           2 :     printf("protocol_version\n");
    2263           2 :     printf("simple_pipeline\n");
    2264           2 :     printf("singlerow\n");
    2265           2 :     printf("transaction\n");
    2266           2 :     printf("uniqviol\n");
    2267           2 : }
    2268             : 
    2269             : int
    2270          30 : main(int argc, char **argv)
    2271             : {
    2272          30 :     const char *conninfo = "";
    2273             :     PGconn     *conn;
    2274             :     FILE       *trace;
    2275             :     char       *testname;
    2276          30 :     int         numrows = 10000;
    2277             :     PGresult   *res;
    2278             :     int         c;
    2279             : 
    2280         104 :     while ((c = getopt(argc, argv, "r:t:")) != -1)
    2281             :     {
    2282          44 :         switch (c)
    2283             :         {
    2284          26 :             case 'r':           /* numrows */
    2285          26 :                 errno = 0;
    2286          26 :                 numrows = strtol(optarg, NULL, 10);
    2287          26 :                 if (errno != 0 || numrows <= 0)
    2288             :                 {
    2289           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    2290             :                             optarg);
    2291           0 :                     exit(1);
    2292             :                 }
    2293          26 :                 break;
    2294          18 :             case 't':           /* trace file */
    2295          18 :                 tracefile = pg_strdup(optarg);
    2296          18 :                 break;
    2297             :         }
    2298          74 :     }
    2299             : 
    2300          30 :     if (optind < argc)
    2301             :     {
    2302          30 :         testname = pg_strdup(argv[optind]);
    2303          30 :         optind++;
    2304             :     }
    2305             :     else
    2306             :     {
    2307           0 :         usage(argv[0]);
    2308           0 :         exit(1);
    2309             :     }
    2310             : 
    2311          30 :     if (strcmp(testname, "tests") == 0)
    2312             :     {
    2313           2 :         print_test_list();
    2314           2 :         exit(0);
    2315             :     }
    2316             : 
    2317          28 :     if (optind < argc)
    2318             :     {
    2319          28 :         conninfo = pg_strdup(argv[optind]);
    2320          28 :         optind++;
    2321             :     }
    2322             : 
    2323             :     /* Make a connection to the database */
    2324          28 :     conn = PQconnectdb(conninfo);
    2325          28 :     if (PQstatus(conn) != CONNECTION_OK)
    2326             :     {
    2327           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    2328             :                 PQerrorMessage(conn));
    2329           0 :         exit_nicely(conn);
    2330             :     }
    2331             : 
    2332          28 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    2333          28 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2334           0 :         pg_fatal("failed to set \"lc_messages\": %s", PQerrorMessage(conn));
    2335          28 :     res = PQexec(conn, "SET debug_parallel_query = off");
    2336          28 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    2337           0 :         pg_fatal("failed to set \"debug_parallel_query\": %s", PQerrorMessage(conn));
    2338             : 
    2339             :     /* Set the trace file, if requested */
    2340          28 :     if (tracefile != NULL)
    2341             :     {
    2342          18 :         if (strcmp(tracefile, "-") == 0)
    2343           0 :             trace = stdout;
    2344             :         else
    2345          18 :             trace = fopen(tracefile, "w");
    2346          18 :         if (trace == NULL)
    2347           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    2348             : 
    2349             :         /* Make it line-buffered */
    2350          18 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    2351             : 
    2352          18 :         PQtrace(conn, trace);
    2353          18 :         PQsetTraceFlags(conn,
    2354             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    2355             :     }
    2356             : 
    2357          28 :     if (strcmp(testname, "cancel") == 0)
    2358           4 :         test_cancel(conn);
    2359          24 :     else if (strcmp(testname, "disallowed_in_pipeline") == 0)
    2360           2 :         test_disallowed_in_pipeline(conn);
    2361          22 :     else if (strcmp(testname, "multi_pipelines") == 0)
    2362           2 :         test_multi_pipelines(conn);
    2363          20 :     else if (strcmp(testname, "nosync") == 0)
    2364           2 :         test_nosync(conn);
    2365          18 :     else if (strcmp(testname, "pipeline_abort") == 0)
    2366           2 :         test_pipeline_abort(conn);
    2367          16 :     else if (strcmp(testname, "pipeline_idle") == 0)
    2368           2 :         test_pipeline_idle(conn);
    2369          14 :     else if (strcmp(testname, "pipelined_insert") == 0)
    2370           2 :         test_pipelined_insert(conn, numrows);
    2371          12 :     else if (strcmp(testname, "prepared") == 0)
    2372           2 :         test_prepared(conn);
    2373          10 :     else if (strcmp(testname, "protocol_version") == 0)
    2374           2 :         test_protocol_version(conn);
    2375           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    2376           2 :         test_simple_pipeline(conn);
    2377           6 :     else if (strcmp(testname, "singlerow") == 0)
    2378           2 :         test_singlerowmode(conn);
    2379           4 :     else if (strcmp(testname, "transaction") == 0)
    2380           2 :         test_transaction(conn);
    2381           2 :     else if (strcmp(testname, "uniqviol") == 0)
    2382           2 :         test_uniqviol(conn);
    2383             :     else
    2384             :     {
    2385           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    2386           0 :         exit(1);
    2387             :     }
    2388             : 
    2389             :     /* close the connection to the database and cleanup */
    2390          28 :     PQfinish(conn);
    2391          28 :     return 0;
    2392             : }

Generated by: LCOV version 1.14