LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 728 1018 71.5 %
Date: 2024-02-21 23:10:58 Functions: 15 18 83.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/select.h>
      19             : #include <sys/time.h>
      20             : 
      21             : #include "catalog/pg_type_d.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "libpq-fe.h"
      24             : #include "pg_getopt.h"
      25             : #include "portability/instr_time.h"
      26             : 
      27             : 
      28             : static void exit_nicely(PGconn *conn);
      29             : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
      30             :             pg_attribute_printf(2, 3);
      31             : static bool process_result(PGconn *conn, PGresult *res, int results,
      32             :                            int numsent);
      33             : 
      34             : const char *const progname = "libpq_pipeline";
      35             : 
      36             : /* Options and defaults */
      37             : char       *tracefile = NULL;   /* path to PQtrace() file */
      38             : 
      39             : 
      40             : #ifdef DEBUG_OUTPUT
      41             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      42             : #else
      43             : #define pg_debug(...)
      44             : #endif
      45             : 
      46             : static const char *const drop_table_sql =
      47             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      48             : static const char *const create_table_sql =
      49             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      50             : "int8filler int8);";
      51             : static const char *const insert_sql =
      52             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      53             : static const char *const insert_sql2 =
      54             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      55             : 
      56             : /* max char length of an int32/64, plus sign and null terminator */
      57             : #define MAXINTLEN 12
      58             : #define MAXINT8LEN 20
      59             : 
      60             : static void
      61           0 : exit_nicely(PGconn *conn)
      62             : {
      63           0 :     PQfinish(conn);
      64           0 :     exit(1);
      65             : }
      66             : 
      67             : /*
      68             :  * Print an error to stderr and terminate the program.
      69             :  */
      70             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      71             : static void
      72             : pg_attribute_noreturn()
      73           0 : pg_fatal_impl(int line, const char *fmt,...)
      74             : {
      75             :     va_list     args;
      76             : 
      77             : 
      78           0 :     fflush(stdout);
      79             : 
      80           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      81           0 :     va_start(args, fmt);
      82           0 :     vfprintf(stderr, fmt, args);
      83           0 :     va_end(args);
      84             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      85           0 :     fprintf(stderr, "\n");
      86           0 :     exit(1);
      87             : }
      88             : 
      89             : static void
      90           2 : test_disallowed_in_pipeline(PGconn *conn)
      91             : {
      92           2 :     PGresult   *res = NULL;
      93             : 
      94           2 :     fprintf(stderr, "test error cases... ");
      95             : 
      96           2 :     if (PQisnonblocking(conn))
      97           0 :         pg_fatal("Expected blocking connection mode");
      98             : 
      99           2 :     if (PQenterPipelineMode(conn) != 1)
     100           0 :         pg_fatal("Unable to enter pipeline mode");
     101             : 
     102           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     103           0 :         pg_fatal("Pipeline mode not activated properly");
     104             : 
     105             :     /* PQexec should fail in pipeline mode */
     106           2 :     res = PQexec(conn, "SELECT 1");
     107           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     108           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     109           2 :     if (strcmp(PQerrorMessage(conn),
     110             :                "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
     111           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     112             :                  PQerrorMessage(conn));
     113             : 
     114             :     /* PQsendQuery should fail in pipeline mode */
     115           2 :     if (PQsendQuery(conn, "SELECT 1") != 0)
     116           0 :         pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
     117           2 :     if (strcmp(PQerrorMessage(conn),
     118             :                "PQsendQuery not allowed in pipeline mode\n") != 0)
     119           0 :         pg_fatal("did not get expected error message; got: \"%s\"",
     120             :                  PQerrorMessage(conn));
     121             : 
     122             :     /* Entering pipeline mode when already in pipeline mode is OK */
     123           2 :     if (PQenterPipelineMode(conn) != 1)
     124           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     125             : 
     126           2 :     if (PQisBusy(conn) != 0)
     127           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     128             : 
     129             :     /* ok, back to normal command mode */
     130           2 :     if (PQexitPipelineMode(conn) != 1)
     131           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     132             : 
     133           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     134           0 :         pg_fatal("Pipeline mode not terminated properly");
     135             : 
     136             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     137           2 :     if (PQexitPipelineMode(conn) != 1)
     138           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     139             : 
     140             :     /* can now PQexec again */
     141           2 :     res = PQexec(conn, "SELECT 1");
     142           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     143           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     144             :                  PQerrorMessage(conn));
     145             : 
     146           2 :     fprintf(stderr, "ok\n");
     147           2 : }
     148             : 
     149             : static void
     150           2 : test_multi_pipelines(PGconn *conn)
     151             : {
     152           2 :     PGresult   *res = NULL;
     153           2 :     const char *dummy_params[1] = {"1"};
     154           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     155             : 
     156           2 :     fprintf(stderr, "multi pipeline... ");
     157             : 
     158             :     /*
     159             :      * Queue up a couple of small pipelines and process each without returning
     160             :      * to command mode first.
     161             :      */
     162           2 :     if (PQenterPipelineMode(conn) != 1)
     163           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     164             : 
     165             :     /* first pipeline */
     166           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     167             :                           dummy_params, NULL, NULL, 0) != 1)
     168           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     169             : 
     170           2 :     if (PQpipelineSync(conn) != 1)
     171           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     172             : 
     173             :     /* second pipeline */
     174           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     175             :                           dummy_params, NULL, NULL, 0) != 1)
     176           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     177             : 
     178             :     /* Skip flushing once. */
     179           2 :     if (PQsendPipelineSync(conn) != 1)
     180           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     181             : 
     182             :     /* third pipeline */
     183           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     184             :                           dummy_params, NULL, NULL, 0) != 1)
     185           0 :         pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
     186             : 
     187           2 :     if (PQpipelineSync(conn) != 1)
     188           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     189             : 
     190             :     /* OK, start processing the results */
     191             : 
     192             :     /* first pipeline */
     193             : 
     194           2 :     res = PQgetResult(conn);
     195           2 :     if (res == NULL)
     196           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     197             :                  PQerrorMessage(conn));
     198             : 
     199           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     200           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
     201             :                  PQresStatus(PQresultStatus(res)));
     202           2 :     PQclear(res);
     203           2 :     res = NULL;
     204             : 
     205           2 :     if (PQgetResult(conn) != NULL)
     206           0 :         pg_fatal("PQgetResult returned something extra after first result");
     207             : 
     208           2 :     if (PQexitPipelineMode(conn) != 0)
     209           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     210             : 
     211           2 :     res = PQgetResult(conn);
     212           2 :     if (res == NULL)
     213           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     214             :                  PQerrorMessage(conn));
     215             : 
     216           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     217           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     218             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     219           2 :     PQclear(res);
     220             : 
     221             :     /* second pipeline */
     222             : 
     223           2 :     res = PQgetResult(conn);
     224           2 :     if (res == NULL)
     225           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     226             :                  PQerrorMessage(conn));
     227             : 
     228           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229           0 :         pg_fatal("Unexpected result code %s from second pipeline item",
     230             :                  PQresStatus(PQresultStatus(res)));
     231           2 :     PQclear(res);
     232           2 :     res = NULL;
     233             : 
     234           2 :     if (PQgetResult(conn) != NULL)
     235           0 :         pg_fatal("PQgetResult returned something extra after first result");
     236             : 
     237           2 :     if (PQexitPipelineMode(conn) != 0)
     238           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     239             : 
     240           2 :     res = PQgetResult(conn);
     241           2 :     if (res == NULL)
     242           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     243             :                  PQerrorMessage(conn));
     244             : 
     245           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     246           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     247             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     248           2 :     PQclear(res);
     249             : 
     250             :     /* third pipeline */
     251             : 
     252           2 :     res = PQgetResult(conn);
     253           2 :     if (res == NULL)
     254           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     255             :                  PQerrorMessage(conn));
     256             : 
     257           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     258           0 :         pg_fatal("Unexpected result code %s from third pipeline item",
     259             :                  PQresStatus(PQresultStatus(res)));
     260             : 
     261           2 :     res = PQgetResult(conn);
     262           2 :     if (res != NULL)
     263           0 :         pg_fatal("Expected null result, got %s",
     264             :                  PQresStatus(PQresultStatus(res)));
     265             : 
     266           2 :     res = PQgetResult(conn);
     267           2 :     if (res == NULL)
     268           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     269             :                  PQerrorMessage(conn));
     270             : 
     271           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     272           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     273             :                  PQresStatus(PQresultStatus(res)));
     274             : 
     275             :     /* We're still in pipeline mode ... */
     276           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     277           0 :         pg_fatal("Fell out of pipeline mode somehow");
     278             : 
     279             :     /* until we end it, which we can safely do now */
     280           2 :     if (PQexitPipelineMode(conn) != 1)
     281           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     282             :                  PQerrorMessage(conn));
     283             : 
     284           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     285           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     286             : 
     287           2 :     fprintf(stderr, "ok\n");
     288           2 : }
     289             : 
     290             : /*
     291             :  * Test behavior when a pipeline dispatches a number of commands that are
     292             :  * not flushed by a sync point.
     293             :  */
     294             : static void
     295           2 : test_nosync(PGconn *conn)
     296             : {
     297           2 :     int         numqueries = 10;
     298           2 :     int         results = 0;
     299           2 :     int         sock = PQsocket(conn);
     300             : 
     301           2 :     fprintf(stderr, "nosync... ");
     302             : 
     303           2 :     if (sock < 0)
     304           0 :         pg_fatal("invalid socket");
     305             : 
     306           2 :     if (PQenterPipelineMode(conn) != 1)
     307           0 :         pg_fatal("could not enter pipeline mode");
     308          22 :     for (int i = 0; i < numqueries; i++)
     309             :     {
     310             :         fd_set      input_mask;
     311             :         struct timeval tv;
     312             : 
     313          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     314             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     315           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     316          20 :         PQflush(conn);
     317             : 
     318             :         /*
     319             :          * If the server has written anything to us, read (some of) it now.
     320             :          */
     321          20 :         FD_ZERO(&input_mask);
     322          20 :         FD_SET(sock, &input_mask);
     323          20 :         tv.tv_sec = 0;
     324          20 :         tv.tv_usec = 0;
     325          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     326             :         {
     327           0 :             fprintf(stderr, "select() failed: %s\n", strerror(errno));
     328           0 :             exit_nicely(conn);
     329             :         }
     330          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     331           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     332             :     }
     333             : 
     334             :     /* tell server to flush its output buffer */
     335           2 :     if (PQsendFlushRequest(conn) != 1)
     336           0 :         pg_fatal("failed to send flush request");
     337           2 :     PQflush(conn);
     338             : 
     339             :     /* Now read all results */
     340             :     for (;;)
     341          18 :     {
     342             :         PGresult   *res;
     343             : 
     344          20 :         res = PQgetResult(conn);
     345             : 
     346             :         /* NULL results are only expected after TUPLES_OK */
     347          20 :         if (res == NULL)
     348           0 :             pg_fatal("got unexpected NULL result after %d results", results);
     349             : 
     350             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     351          20 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     352             :         {
     353             :             PGresult   *res2;
     354             : 
     355             :             /* and one NULL result should follow each */
     356          20 :             res2 = PQgetResult(conn);
     357          20 :             if (res2 != NULL)
     358           0 :                 pg_fatal("expected NULL, got %s",
     359             :                          PQresStatus(PQresultStatus(res2)));
     360          20 :             PQclear(res);
     361          20 :             results++;
     362             : 
     363             :             /* if we're done, we're done */
     364          20 :             if (results == numqueries)
     365           2 :                 break;
     366             : 
     367          18 :             continue;
     368             :         }
     369             : 
     370             :         /* anything else is unexpected */
     371           0 :         pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
     372             :     }
     373             : 
     374           2 :     fprintf(stderr, "ok\n");
     375           2 : }
     376             : 
     377             : /*
     378             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     379             :  * still have to get results for each pipeline item, but the item will just be
     380             :  * a PGRES_PIPELINE_ABORTED code.
     381             :  *
     382             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     383             :  * usually use an xact, but in this case we want to observe the effects of each
     384             :  * statement.
     385             :  */
     386             : static void
     387           2 : test_pipeline_abort(PGconn *conn)
     388             : {
     389           2 :     PGresult   *res = NULL;
     390           2 :     const char *dummy_params[1] = {"1"};
     391           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     392             :     int         i;
     393             :     int         gotrows;
     394             :     bool        goterror;
     395             : 
     396           2 :     fprintf(stderr, "aborted pipeline... ");
     397             : 
     398           2 :     res = PQexec(conn, drop_table_sql);
     399           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     400           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     401             : 
     402           2 :     res = PQexec(conn, create_table_sql);
     403           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     404           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     405             : 
     406             :     /*
     407             :      * Queue up a couple of small pipelines and process each without returning
     408             :      * to command mode first. Make sure the second operation in the first
     409             :      * pipeline ERRORs.
     410             :      */
     411           2 :     if (PQenterPipelineMode(conn) != 1)
     412           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     413             : 
     414           2 :     dummy_params[0] = "1";
     415           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     416             :                           dummy_params, NULL, NULL, 0) != 1)
     417           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     418             : 
     419           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     420             :                           1, dummy_param_oids, dummy_params,
     421             :                           NULL, NULL, 0) != 1)
     422           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     423             : 
     424           2 :     dummy_params[0] = "2";
     425           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     426             :                           dummy_params, NULL, NULL, 0) != 1)
     427           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     428             : 
     429           2 :     if (PQpipelineSync(conn) != 1)
     430           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     431             : 
     432           2 :     dummy_params[0] = "3";
     433           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     434             :                           dummy_params, NULL, NULL, 0) != 1)
     435           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     436             :                  PQerrorMessage(conn));
     437             : 
     438           2 :     if (PQpipelineSync(conn) != 1)
     439           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     440             : 
     441             :     /*
     442             :      * OK, start processing the pipeline results.
     443             :      *
     444             :      * We should get a command-ok for the first query, then a fatal error and
     445             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     446             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     447             :      */
     448           2 :     res = PQgetResult(conn);
     449           2 :     if (res == NULL)
     450           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     451           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     452           0 :         pg_fatal("Unexpected result status %s: %s",
     453             :                  PQresStatus(PQresultStatus(res)),
     454             :                  PQresultErrorMessage(res));
     455           2 :     PQclear(res);
     456             : 
     457             :     /* NULL result to signal end-of-results for this command */
     458           2 :     if ((res = PQgetResult(conn)) != NULL)
     459           0 :         pg_fatal("Expected null result, got %s",
     460             :                  PQresStatus(PQresultStatus(res)));
     461             : 
     462             :     /* Second query caused error, so we expect an error next */
     463           2 :     res = PQgetResult(conn);
     464           2 :     if (res == NULL)
     465           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     466           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     467           0 :         pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
     468             :                  PQresStatus(PQresultStatus(res)));
     469           2 :     PQclear(res);
     470             : 
     471             :     /* NULL result to signal end-of-results for this command */
     472           2 :     if ((res = PQgetResult(conn)) != NULL)
     473           0 :         pg_fatal("Expected null result, got %s",
     474             :                  PQresStatus(PQresultStatus(res)));
     475             : 
     476             :     /*
     477             :      * pipeline should now be aborted.
     478             :      *
     479             :      * Note that we could still queue more queries at this point if we wanted;
     480             :      * they'd get added to a new third pipeline since we've already sent a
     481             :      * second. The aborted flag relates only to the pipeline being received.
     482             :      */
     483           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     484           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     485             : 
     486             :     /* third query in pipeline, the second insert */
     487           2 :     res = PQgetResult(conn);
     488           2 :     if (res == NULL)
     489           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     490           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
     491           0 :         pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
     492             :                  PQresStatus(PQresultStatus(res)));
     493           2 :     PQclear(res);
     494             : 
     495             :     /* NULL result to signal end-of-results for this command */
     496           2 :     if ((res = PQgetResult(conn)) != NULL)
     497           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     498             : 
     499           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     500           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     501             : 
     502             :     /* Ensure we're still in pipeline */
     503           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     504           0 :         pg_fatal("Fell out of pipeline mode somehow");
     505             : 
     506             :     /*
     507             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     508             :      *
     509             :      * (This is so clients know to start processing results normally again and
     510             :      * can tell the difference between skipped commands and the sync.)
     511             :      */
     512           2 :     res = PQgetResult(conn);
     513           2 :     if (res == NULL)
     514           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     515           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     516           0 :         pg_fatal("Unexpected result code from first pipeline sync\n"
     517             :                  "Expected PGRES_PIPELINE_SYNC, got %s",
     518             :                  PQresStatus(PQresultStatus(res)));
     519           2 :     PQclear(res);
     520             : 
     521           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     522           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     523             : 
     524             :     /* We're still in pipeline mode... */
     525           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     526           0 :         pg_fatal("Fell out of pipeline mode somehow");
     527             : 
     528             :     /* the insert from the second pipeline */
     529           2 :     res = PQgetResult(conn);
     530           2 :     if (res == NULL)
     531           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     532           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     533           0 :         pg_fatal("Unexpected result code %s from first item in second pipeline",
     534             :                  PQresStatus(PQresultStatus(res)));
     535           2 :     PQclear(res);
     536             : 
     537             :     /* Read the NULL result at the end of the command */
     538           2 :     if ((res = PQgetResult(conn)) != NULL)
     539           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     540             : 
     541             :     /* the second pipeline sync */
     542           2 :     if ((res = PQgetResult(conn)) == NULL)
     543           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     544           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     545           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     546             :                  PQresStatus(PQresultStatus(res)));
     547           2 :     PQclear(res);
     548             : 
     549           2 :     if ((res = PQgetResult(conn)) != NULL)
     550           0 :         pg_fatal("Expected null result, got %s: %s",
     551             :                  PQresStatus(PQresultStatus(res)),
     552             :                  PQerrorMessage(conn));
     553             : 
     554             :     /* Try to send two queries in one command */
     555           2 :     if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
     556           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     557           2 :     if (PQpipelineSync(conn) != 1)
     558           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     559           2 :     goterror = false;
     560           4 :     while ((res = PQgetResult(conn)) != NULL)
     561             :     {
     562           2 :         switch (PQresultStatus(res))
     563             :         {
     564           2 :             case PGRES_FATAL_ERROR:
     565           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     566           0 :                     pg_fatal("expected error about multiple commands, got %s",
     567             :                              PQerrorMessage(conn));
     568           2 :                 printf("got expected %s", PQerrorMessage(conn));
     569           2 :                 goterror = true;
     570           2 :                 break;
     571           0 :             default:
     572           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     573             :                 break;
     574             :         }
     575             :     }
     576           2 :     if (!goterror)
     577           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     578           2 :     res = PQgetResult(conn);
     579           2 :     if (res == NULL)
     580           0 :         pg_fatal("got NULL result");
     581           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     582           0 :         pg_fatal("Unexpected result code %s from pipeline sync",
     583             :                  PQresStatus(PQresultStatus(res)));
     584           2 :     fprintf(stderr, "ok\n");
     585             : 
     586             :     /* Test single-row mode with an error partways */
     587           2 :     if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
     588             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
     589           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     590           2 :     if (PQpipelineSync(conn) != 1)
     591           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     592           2 :     PQsetSingleRowMode(conn);
     593           2 :     goterror = false;
     594           2 :     gotrows = 0;
     595          10 :     while ((res = PQgetResult(conn)) != NULL)
     596             :     {
     597           8 :         switch (PQresultStatus(res))
     598             :         {
     599           6 :             case PGRES_SINGLE_TUPLE:
     600           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     601           6 :                 gotrows++;
     602           6 :                 break;
     603           2 :             case PGRES_FATAL_ERROR:
     604           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     605           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     606             :                              PQerrorMessage(conn),
     607             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     608           2 :                 printf("got expected division-by-zero\n");
     609           2 :                 goterror = true;
     610           2 :                 break;
     611           0 :             default:
     612           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     613             :         }
     614           8 :         PQclear(res);
     615             :     }
     616           2 :     if (!goterror)
     617           0 :         pg_fatal("did not get division-by-zero error");
     618           2 :     if (gotrows != 3)
     619           0 :         pg_fatal("did not get three rows");
     620             :     /* the third pipeline sync */
     621           2 :     if ((res = PQgetResult(conn)) == NULL)
     622           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     623           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     624           0 :         pg_fatal("Unexpected result code %s from third pipeline sync",
     625             :                  PQresStatus(PQresultStatus(res)));
     626           2 :     PQclear(res);
     627             : 
     628             :     /* We're still in pipeline mode... */
     629           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     630           0 :         pg_fatal("Fell out of pipeline mode somehow");
     631             : 
     632             :     /* until we end it, which we can safely do now */
     633           2 :     if (PQexitPipelineMode(conn) != 1)
     634           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     635             :                  PQerrorMessage(conn));
     636             : 
     637           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     638           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     639             : 
     640             :     /*-
     641             :      * Since we fired the pipelines off without a surrounding xact, the results
     642             :      * should be:
     643             :      *
     644             :      * - Implicit xact started by server around 1st pipeline
     645             :      * - First insert applied
     646             :      * - Second statement aborted xact
     647             :      * - Third insert skipped
     648             :      * - Sync rolled back first implicit xact
     649             :      * - Implicit xact created by server around 2nd pipeline
     650             :      * - insert applied from 2nd pipeline
     651             :      * - Sync commits 2nd xact
     652             :      *
     653             :      * So we should only have the value 3 that we inserted.
     654             :      */
     655           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     656             : 
     657           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     658           0 :         pg_fatal("Expected tuples, got %s: %s",
     659             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     660           2 :     if (PQntuples(res) != 1)
     661           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     662           4 :     for (i = 0; i < PQntuples(res); i++)
     663             :     {
     664           2 :         const char *val = PQgetvalue(res, i, 0);
     665             : 
     666           2 :         if (strcmp(val, "3") != 0)
     667           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     668             :     }
     669             : 
     670           2 :     PQclear(res);
     671             : 
     672           2 :     fprintf(stderr, "ok\n");
     673           2 : }
     674             : 
     675             : /* State machine enum for test_pipelined_insert */
     676             : enum PipelineInsertStep
     677             : {
     678             :     BI_BEGIN_TX,
     679             :     BI_DROP_TABLE,
     680             :     BI_CREATE_TABLE,
     681             :     BI_PREPARE,
     682             :     BI_INSERT_ROWS,
     683             :     BI_COMMIT_TX,
     684             :     BI_SYNC,
     685             :     BI_DONE,
     686             : };
     687             : 
     688             : static void
     689           2 : test_pipelined_insert(PGconn *conn, int n_rows)
     690             : {
     691           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
     692             :     const char *insert_params[2];
     693             :     char        insert_param_0[MAXINTLEN];
     694             :     char        insert_param_1[MAXINT8LEN];
     695           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
     696           2 :                 recv_step = BI_BEGIN_TX;
     697             :     int         rows_to_send,
     698             :                 rows_to_receive;
     699             : 
     700           2 :     insert_params[0] = insert_param_0;
     701           2 :     insert_params[1] = insert_param_1;
     702             : 
     703           2 :     rows_to_send = rows_to_receive = n_rows;
     704             : 
     705             :     /*
     706             :      * Do a pipelined insert into a table created at the start of the pipeline
     707             :      */
     708           2 :     if (PQenterPipelineMode(conn) != 1)
     709           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     710             : 
     711           8 :     while (send_step != BI_PREPARE)
     712             :     {
     713             :         const char *sql;
     714             : 
     715           6 :         switch (send_step)
     716             :         {
     717           2 :             case BI_BEGIN_TX:
     718           2 :                 sql = "BEGIN TRANSACTION";
     719           2 :                 send_step = BI_DROP_TABLE;
     720           2 :                 break;
     721             : 
     722           2 :             case BI_DROP_TABLE:
     723           2 :                 sql = drop_table_sql;
     724           2 :                 send_step = BI_CREATE_TABLE;
     725           2 :                 break;
     726             : 
     727           2 :             case BI_CREATE_TABLE:
     728           2 :                 sql = create_table_sql;
     729           2 :                 send_step = BI_PREPARE;
     730           2 :                 break;
     731             : 
     732           0 :             default:
     733           0 :                 pg_fatal("invalid state");
     734             :                 sql = NULL;     /* keep compiler quiet */
     735             :         }
     736             : 
     737             :         pg_debug("sending: %s\n", sql);
     738           6 :         if (PQsendQueryParams(conn, sql,
     739             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     740           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
     741             :     }
     742             : 
     743             :     Assert(send_step == BI_PREPARE);
     744             :     pg_debug("sending: %s\n", insert_sql2);
     745           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
     746           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
     747           2 :     send_step = BI_INSERT_ROWS;
     748             : 
     749             :     /*
     750             :      * Now we start inserting. We'll be sending enough data that we could fill
     751             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
     752             :      * mode and consume input while we send more output. As results of each
     753             :      * query are processed we should pop them to allow processing of the next
     754             :      * query. There's no need to finish the pipeline before processing
     755             :      * results.
     756             :      */
     757           2 :     if (PQsetnonblocking(conn, 1) != 0)
     758           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     759             : 
     760       28462 :     while (recv_step != BI_DONE)
     761             :     {
     762             :         int         sock;
     763             :         fd_set      input_mask;
     764             :         fd_set      output_mask;
     765             : 
     766       28460 :         sock = PQsocket(conn);
     767             : 
     768       28460 :         if (sock < 0)
     769           0 :             break;              /* shouldn't happen */
     770             : 
     771       28460 :         FD_ZERO(&input_mask);
     772       28460 :         FD_SET(sock, &input_mask);
     773       28460 :         FD_ZERO(&output_mask);
     774       28460 :         FD_SET(sock, &output_mask);
     775             : 
     776       28460 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
     777             :         {
     778           0 :             fprintf(stderr, "select() failed: %s\n", strerror(errno));
     779           0 :             exit_nicely(conn);
     780             :         }
     781             : 
     782             :         /*
     783             :          * Process any results, so we keep the server's output buffer free
     784             :          * flowing and it can continue to process input
     785             :          */
     786       28460 :         if (FD_ISSET(sock, &input_mask))
     787             :         {
     788           6 :             PQconsumeInput(conn);
     789             : 
     790             :             /* Read until we'd block if we tried to read */
     791        2828 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
     792             :             {
     793             :                 PGresult   *res;
     794        2822 :                 const char *cmdtag = "";
     795        2822 :                 const char *description = "";
     796             :                 int         status;
     797             : 
     798             :                 /*
     799             :                  * Read next result.  If no more results from this query,
     800             :                  * advance to the next query
     801             :                  */
     802        2822 :                 res = PQgetResult(conn);
     803        2822 :                 if (res == NULL)
     804        1410 :                     continue;
     805             : 
     806        1412 :                 status = PGRES_COMMAND_OK;
     807        1412 :                 switch (recv_step)
     808             :                 {
     809           2 :                     case BI_BEGIN_TX:
     810           2 :                         cmdtag = "BEGIN";
     811           2 :                         recv_step++;
     812           2 :                         break;
     813           2 :                     case BI_DROP_TABLE:
     814           2 :                         cmdtag = "DROP TABLE";
     815           2 :                         recv_step++;
     816           2 :                         break;
     817           2 :                     case BI_CREATE_TABLE:
     818           2 :                         cmdtag = "CREATE TABLE";
     819           2 :                         recv_step++;
     820           2 :                         break;
     821           2 :                     case BI_PREPARE:
     822           2 :                         cmdtag = "";
     823           2 :                         description = "PREPARE";
     824           2 :                         recv_step++;
     825           2 :                         break;
     826        1400 :                     case BI_INSERT_ROWS:
     827        1400 :                         cmdtag = "INSERT";
     828        1400 :                         rows_to_receive--;
     829        1400 :                         if (rows_to_receive == 0)
     830           2 :                             recv_step++;
     831        1400 :                         break;
     832           2 :                     case BI_COMMIT_TX:
     833           2 :                         cmdtag = "COMMIT";
     834           2 :                         recv_step++;
     835           2 :                         break;
     836           2 :                     case BI_SYNC:
     837           2 :                         cmdtag = "";
     838           2 :                         description = "SYNC";
     839           2 :                         status = PGRES_PIPELINE_SYNC;
     840           2 :                         recv_step++;
     841           2 :                         break;
     842           0 :                     case BI_DONE:
     843             :                         /* unreachable */
     844           0 :                         pg_fatal("unreachable state");
     845             :                 }
     846             : 
     847        1412 :                 if (PQresultStatus(res) != status)
     848           0 :                     pg_fatal("%s reported status %s, expected %s\n"
     849             :                              "Error message: \"%s\"",
     850             :                              description, PQresStatus(PQresultStatus(res)),
     851             :                              PQresStatus(status), PQerrorMessage(conn));
     852             : 
     853        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
     854           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
     855             :                              description, cmdtag, PQcmdStatus(res));
     856             : 
     857             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
     858             : 
     859        1412 :                 PQclear(res);
     860             :             }
     861             :         }
     862             : 
     863             :         /* Write more rows and/or the end pipeline message, if needed */
     864       28460 :         if (FD_ISSET(sock, &output_mask))
     865             :         {
     866       28456 :             PQflush(conn);
     867             : 
     868       28456 :             if (send_step == BI_INSERT_ROWS)
     869             :             {
     870        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
     871             :                 /* use up some buffer space with a wide value */
     872        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
     873             : 
     874        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
     875             :                                         2, insert_params, NULL, NULL, 0) == 1)
     876             :                 {
     877             :                     pg_debug("sent row %d\n", rows_to_send);
     878             : 
     879        1400 :                     rows_to_send--;
     880        1400 :                     if (rows_to_send == 0)
     881           2 :                         send_step++;
     882             :                 }
     883             :                 else
     884             :                 {
     885             :                     /*
     886             :                      * in nonblocking mode, so it's OK for an insert to fail
     887             :                      * to send
     888             :                      */
     889           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
     890             :                             rows_to_send, PQerrorMessage(conn));
     891             :                 }
     892             :             }
     893       27056 :             else if (send_step == BI_COMMIT_TX)
     894             :             {
     895           2 :                 if (PQsendQueryParams(conn, "COMMIT",
     896             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
     897             :                 {
     898             :                     pg_debug("sent COMMIT\n");
     899           2 :                     send_step++;
     900             :                 }
     901             :                 else
     902             :                 {
     903           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
     904             :                             PQerrorMessage(conn));
     905             :                 }
     906             :             }
     907       27054 :             else if (send_step == BI_SYNC)
     908             :             {
     909           2 :                 if (PQpipelineSync(conn) == 1)
     910             :                 {
     911           2 :                     fprintf(stdout, "pipeline sync sent\n");
     912           2 :                     send_step++;
     913             :                 }
     914             :                 else
     915             :                 {
     916           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
     917             :                             PQerrorMessage(conn));
     918             :                 }
     919             :             }
     920             :         }
     921             :     }
     922             : 
     923             :     /* We've got the sync message and the pipeline should be done */
     924           2 :     if (PQexitPipelineMode(conn) != 1)
     925           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     926             :                  PQerrorMessage(conn));
     927             : 
     928           2 :     if (PQsetnonblocking(conn, 0) != 0)
     929           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
     930             : 
     931           2 :     fprintf(stderr, "ok\n");
     932           2 : }
     933             : 
     934             : static void
     935           2 : test_prepared(PGconn *conn)
     936             : {
     937           2 :     PGresult   *res = NULL;
     938           2 :     Oid         param_oids[1] = {INT4OID};
     939             :     Oid         expected_oids[4];
     940             :     Oid         typ;
     941             : 
     942           2 :     fprintf(stderr, "prepared... ");
     943             : 
     944           2 :     if (PQenterPipelineMode(conn) != 1)
     945           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     946           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
     947             :                       "interval '1 sec'",
     948             :                       1, param_oids) != 1)
     949           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
     950           2 :     expected_oids[0] = INT4OID;
     951           2 :     expected_oids[1] = TEXTOID;
     952           2 :     expected_oids[2] = NUMERICOID;
     953           2 :     expected_oids[3] = INTERVALOID;
     954           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
     955           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
     956           2 :     if (PQpipelineSync(conn) != 1)
     957           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     958             : 
     959           2 :     res = PQgetResult(conn);
     960           2 :     if (res == NULL)
     961           0 :         pg_fatal("PQgetResult returned null");
     962           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     963           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
     964           2 :     PQclear(res);
     965           2 :     res = PQgetResult(conn);
     966           2 :     if (res != NULL)
     967           0 :         pg_fatal("expected NULL result");
     968             : 
     969           2 :     res = PQgetResult(conn);
     970           2 :     if (res == NULL)
     971           0 :         pg_fatal("PQgetResult returned NULL");
     972           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     973           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
     974           2 :     if (PQnfields(res) != lengthof(expected_oids))
     975           0 :         pg_fatal("expected %zu columns, got %d",
     976             :                  lengthof(expected_oids), PQnfields(res));
     977          10 :     for (int i = 0; i < PQnfields(res); i++)
     978             :     {
     979           8 :         typ = PQftype(res, i);
     980           8 :         if (typ != expected_oids[i])
     981           0 :             pg_fatal("field %d: expected type %u, got %u",
     982             :                      i, expected_oids[i], typ);
     983             :     }
     984           2 :     PQclear(res);
     985           2 :     res = PQgetResult(conn);
     986           2 :     if (res != NULL)
     987           0 :         pg_fatal("expected NULL result");
     988             : 
     989           2 :     res = PQgetResult(conn);
     990           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     991           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
     992             : 
     993           2 :     fprintf(stderr, "closing statement..");
     994           2 :     if (PQsendClosePrepared(conn, "select_one") != 1)
     995           0 :         pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
     996           2 :     if (PQpipelineSync(conn) != 1)
     997           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     998             : 
     999           2 :     res = PQgetResult(conn);
    1000           2 :     if (res == NULL)
    1001           0 :         pg_fatal("expected non-NULL result");
    1002           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1003           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1004           2 :     PQclear(res);
    1005           2 :     res = PQgetResult(conn);
    1006           2 :     if (res != NULL)
    1007           0 :         pg_fatal("expected NULL result");
    1008           2 :     res = PQgetResult(conn);
    1009           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1010           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1011             : 
    1012           2 :     if (PQexitPipelineMode(conn) != 1)
    1013           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1014             : 
    1015             :     /* Now that it's closed we should get an error when describing */
    1016           2 :     res = PQdescribePrepared(conn, "select_one");
    1017           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1018           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1019             : 
    1020             :     /*
    1021             :      * Also test the blocking close, this should not fail since closing a
    1022             :      * non-existent prepared statement is a no-op
    1023             :      */
    1024           2 :     res = PQclosePrepared(conn, "select_one");
    1025           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1026           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1027             : 
    1028           2 :     fprintf(stderr, "creating portal... ");
    1029           2 :     PQexec(conn, "BEGIN");
    1030           2 :     PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
    1031           2 :     PQenterPipelineMode(conn);
    1032           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
    1033           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
    1034           2 :     if (PQpipelineSync(conn) != 1)
    1035           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1036           2 :     res = PQgetResult(conn);
    1037           2 :     if (res == NULL)
    1038           0 :         pg_fatal("PQgetResult returned null");
    1039           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1040           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1041             : 
    1042           2 :     typ = PQftype(res, 0);
    1043           2 :     if (typ != INT4OID)
    1044           0 :         pg_fatal("portal: expected type %u, got %u",
    1045             :                  INT4OID, typ);
    1046           2 :     PQclear(res);
    1047           2 :     res = PQgetResult(conn);
    1048           2 :     if (res != NULL)
    1049           0 :         pg_fatal("expected NULL result");
    1050           2 :     res = PQgetResult(conn);
    1051           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1052           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1053             : 
    1054           2 :     fprintf(stderr, "closing portal... ");
    1055           2 :     if (PQsendClosePortal(conn, "cursor_one") != 1)
    1056           0 :         pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
    1057           2 :     if (PQpipelineSync(conn) != 1)
    1058           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1059             : 
    1060           2 :     res = PQgetResult(conn);
    1061           2 :     if (res == NULL)
    1062           0 :         pg_fatal("expected non-NULL result");
    1063           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1064           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1065           2 :     PQclear(res);
    1066           2 :     res = PQgetResult(conn);
    1067           2 :     if (res != NULL)
    1068           0 :         pg_fatal("expected NULL result");
    1069           2 :     res = PQgetResult(conn);
    1070           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1071           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
    1072             : 
    1073           2 :     if (PQexitPipelineMode(conn) != 1)
    1074           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
    1075             : 
    1076             :     /* Now that it's closed we should get an error when describing */
    1077           2 :     res = PQdescribePortal(conn, "cursor_one");
    1078           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
    1079           0 :         pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
    1080             : 
    1081             :     /*
    1082             :      * Also test the blocking close, this should not fail since closing a
    1083             :      * non-existent portal is a no-op
    1084             :      */
    1085           2 :     res = PQclosePortal(conn, "cursor_one");
    1086           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1087           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
    1088             : 
    1089           2 :     fprintf(stderr, "ok\n");
    1090           2 : }
    1091             : 
    1092             : /* Notice processor: print notices, and count how many we got */
    1093             : static void
    1094           2 : notice_processor(void *arg, const char *message)
    1095             : {
    1096           2 :     int        *n_notices = (int *) arg;
    1097             : 
    1098           2 :     (*n_notices)++;
    1099           2 :     fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
    1100           2 : }
    1101             : 
    1102             : /* Verify behavior in "idle" state */
    1103             : static void
    1104           2 : test_pipeline_idle(PGconn *conn)
    1105             : {
    1106             :     PGresult   *res;
    1107           2 :     int         n_notices = 0;
    1108             : 
    1109           2 :     fprintf(stderr, "\npipeline idle...\n");
    1110             : 
    1111           2 :     PQsetNoticeProcessor(conn, notice_processor, &n_notices);
    1112             : 
    1113             :     /* Try to exit pipeline mode in pipeline-idle state */
    1114           2 :     if (PQenterPipelineMode(conn) != 1)
    1115           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1116           2 :     if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1117           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1118           2 :     PQsendFlushRequest(conn);
    1119           2 :     res = PQgetResult(conn);
    1120           2 :     if (res == NULL)
    1121           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1122             :                  PQerrorMessage(conn));
    1123           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1124           0 :         pg_fatal("unexpected result code %s from first pipeline item",
    1125             :                  PQresStatus(PQresultStatus(res)));
    1126           2 :     PQclear(res);
    1127           2 :     res = PQgetResult(conn);
    1128           2 :     if (res != NULL)
    1129           0 :         pg_fatal("did not receive terminating NULL");
    1130           2 :     if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1131           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1132           2 :     if (PQexitPipelineMode(conn) == 1)
    1133           0 :         pg_fatal("exiting pipeline succeeded when it shouldn't");
    1134           2 :     if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
    1135             :                 strlen("cannot exit pipeline mode")) != 0)
    1136           0 :         pg_fatal("did not get expected error; got: %s",
    1137             :                  PQerrorMessage(conn));
    1138           2 :     PQsendFlushRequest(conn);
    1139           2 :     res = PQgetResult(conn);
    1140           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1141           0 :         pg_fatal("unexpected result code %s from second pipeline item",
    1142             :                  PQresStatus(PQresultStatus(res)));
    1143           2 :     PQclear(res);
    1144           2 :     res = PQgetResult(conn);
    1145           2 :     if (res != NULL)
    1146           0 :         pg_fatal("did not receive terminating NULL");
    1147           2 :     if (PQexitPipelineMode(conn) != 1)
    1148           0 :         pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
    1149             : 
    1150           2 :     if (n_notices > 0)
    1151           0 :         pg_fatal("got %d notice(s)", n_notices);
    1152           2 :     fprintf(stderr, "ok - 1\n");
    1153             : 
    1154             :     /* Have a WARNING in the middle of a resultset */
    1155           2 :     if (PQenterPipelineMode(conn) != 1)
    1156           0 :         pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
    1157           2 :     if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
    1158           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
    1159           2 :     PQsendFlushRequest(conn);
    1160           2 :     res = PQgetResult(conn);
    1161           2 :     if (res == NULL)
    1162           0 :         pg_fatal("unexpected NULL result received");
    1163           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1164           0 :         pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
    1165           2 :     if (PQexitPipelineMode(conn) != 1)
    1166           0 :         pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
    1167           2 :     fprintf(stderr, "ok - 2\n");
    1168           2 : }
    1169             : 
    1170             : static void
    1171           2 : test_simple_pipeline(PGconn *conn)
    1172             : {
    1173           2 :     PGresult   *res = NULL;
    1174           2 :     const char *dummy_params[1] = {"1"};
    1175           2 :     Oid         dummy_param_oids[1] = {INT4OID};
    1176             : 
    1177           2 :     fprintf(stderr, "simple pipeline... ");
    1178             : 
    1179             :     /*
    1180             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
    1181             :      * process the results of as they come in.
    1182             :      *
    1183             :      * For a simple case we should be able to do this without interim
    1184             :      * processing of results since our output buffer will give us enough slush
    1185             :      * to work with and we won't block on sending. So blocking mode is fine.
    1186             :      */
    1187           2 :     if (PQisnonblocking(conn))
    1188           0 :         pg_fatal("Expected blocking connection mode");
    1189             : 
    1190           2 :     if (PQenterPipelineMode(conn) != 1)
    1191           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
    1192             : 
    1193           2 :     if (PQsendQueryParams(conn, "SELECT $1",
    1194             :                           1, dummy_param_oids, dummy_params,
    1195             :                           NULL, NULL, 0) != 1)
    1196           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
    1197             : 
    1198           2 :     if (PQexitPipelineMode(conn) != 0)
    1199           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1200             : 
    1201           2 :     if (PQpipelineSync(conn) != 1)
    1202           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1203             : 
    1204           2 :     res = PQgetResult(conn);
    1205           2 :     if (res == NULL)
    1206           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1207             :                  PQerrorMessage(conn));
    1208             : 
    1209           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1210           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
    1211             :                  PQresStatus(PQresultStatus(res)));
    1212             : 
    1213           2 :     PQclear(res);
    1214           2 :     res = NULL;
    1215             : 
    1216           2 :     if (PQgetResult(conn) != NULL)
    1217           0 :         pg_fatal("PQgetResult returned something extra after first query result.");
    1218             : 
    1219             :     /*
    1220             :      * Even though we've processed the result there's still a sync to come and
    1221             :      * we can't exit pipeline mode yet
    1222             :      */
    1223           2 :     if (PQexitPipelineMode(conn) != 0)
    1224           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1225             : 
    1226           2 :     res = PQgetResult(conn);
    1227           2 :     if (res == NULL)
    1228           0 :         pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
    1229             :                  PQerrorMessage(conn));
    1230             : 
    1231           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1232           0 :         pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
    1233             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
    1234             : 
    1235           2 :     PQclear(res);
    1236           2 :     res = NULL;
    1237             : 
    1238           2 :     if (PQgetResult(conn) != NULL)
    1239           0 :         pg_fatal("PQgetResult returned something extra after pipeline end: %s",
    1240             :                  PQresStatus(PQresultStatus(res)));
    1241             : 
    1242             :     /* We're still in pipeline mode... */
    1243           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1244           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1245             : 
    1246             :     /* ... until we end it, which we can safely do now */
    1247           2 :     if (PQexitPipelineMode(conn) != 1)
    1248           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1249             :                  PQerrorMessage(conn));
    1250             : 
    1251           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1252           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1253             : 
    1254           2 :     fprintf(stderr, "ok\n");
    1255           2 : }
    1256             : 
    1257             : static void
    1258           2 : test_singlerowmode(PGconn *conn)
    1259             : {
    1260             :     PGresult   *res;
    1261             :     int         i;
    1262           2 :     bool        pipeline_ended = false;
    1263             : 
    1264           2 :     if (PQenterPipelineMode(conn) != 1)
    1265           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1266             :                  PQerrorMessage(conn));
    1267             : 
    1268             :     /* One series of three commands, using single-row mode for the first two. */
    1269           8 :     for (i = 0; i < 3; i++)
    1270             :     {
    1271             :         char       *param[1];
    1272             : 
    1273           6 :         param[0] = psprintf("%d", 44 + i);
    1274             : 
    1275           6 :         if (PQsendQueryParams(conn,
    1276             :                               "SELECT generate_series(42, $1)",
    1277             :                               1,
    1278             :                               NULL,
    1279             :                               (const char **) param,
    1280             :                               NULL,
    1281             :                               NULL,
    1282             :                               0) != 1)
    1283           0 :             pg_fatal("failed to send query: %s",
    1284             :                      PQerrorMessage(conn));
    1285           6 :         pfree(param[0]);
    1286             :     }
    1287           2 :     if (PQpipelineSync(conn) != 1)
    1288           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1289             : 
    1290          10 :     for (i = 0; !pipeline_ended; i++)
    1291             :     {
    1292           8 :         bool        first = true;
    1293             :         bool        saw_ending_tuplesok;
    1294           8 :         bool        isSingleTuple = false;
    1295             : 
    1296             :         /* Set single row mode for only first 2 SELECT queries */
    1297           8 :         if (i < 2)
    1298             :         {
    1299           4 :             if (PQsetSingleRowMode(conn) != 1)
    1300           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1301             :         }
    1302             : 
    1303             :         /* Consume rows for this query */
    1304           8 :         saw_ending_tuplesok = false;
    1305          28 :         while ((res = PQgetResult(conn)) != NULL)
    1306             :         {
    1307          22 :             ExecStatusType est = PQresultStatus(res);
    1308             : 
    1309          22 :             if (est == PGRES_PIPELINE_SYNC)
    1310             :             {
    1311           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1312           2 :                 pipeline_ended = true;
    1313           2 :                 PQclear(res);
    1314           2 :                 if (i != 3)
    1315           0 :                     pg_fatal("Expected three results, got %d", i);
    1316           2 :                 break;
    1317             :             }
    1318             : 
    1319             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1320          20 :             if (first)
    1321             :             {
    1322           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1323           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1324             :                              i, PQresStatus(est));
    1325           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1326           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1327             :                              i, PQresStatus(est));
    1328           6 :                 first = false;
    1329             :             }
    1330             : 
    1331          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1332          20 :             switch (est)
    1333             :             {
    1334           6 :                 case PGRES_TUPLES_OK:
    1335           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1336           6 :                     saw_ending_tuplesok = true;
    1337           6 :                     if (isSingleTuple)
    1338             :                     {
    1339           4 :                         if (PQntuples(res) == 0)
    1340           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1341             :                         else
    1342           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1343             :                     }
    1344           6 :                     break;
    1345             : 
    1346          14 :                 case PGRES_SINGLE_TUPLE:
    1347          14 :                     isSingleTuple = true;
    1348          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1349          14 :                     break;
    1350             : 
    1351           0 :                 default:
    1352           0 :                     pg_fatal("unexpected");
    1353             :             }
    1354          20 :             PQclear(res);
    1355             :         }
    1356           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1357           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1358             :     }
    1359             : 
    1360             :     /*
    1361             :      * Now issue one command, get its results in with single-row mode, then
    1362             :      * issue another command, and get its results in normal mode; make sure
    1363             :      * the single-row mode flag is reset as expected.
    1364             :      */
    1365           2 :     if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
    1366             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1367           0 :         pg_fatal("failed to send query: %s",
    1368             :                  PQerrorMessage(conn));
    1369           2 :     if (PQsendFlushRequest(conn) != 1)
    1370           0 :         pg_fatal("failed to send flush request");
    1371           2 :     if (PQsetSingleRowMode(conn) != 1)
    1372           0 :         pg_fatal("PQsetSingleRowMode() failed");
    1373           2 :     res = PQgetResult(conn);
    1374           2 :     if (res == NULL)
    1375           0 :         pg_fatal("unexpected NULL");
    1376           2 :     if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
    1377           0 :         pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
    1378             :                  PQresStatus(PQresultStatus(res)));
    1379           2 :     res = PQgetResult(conn);
    1380           2 :     if (res == NULL)
    1381           0 :         pg_fatal("unexpected NULL");
    1382           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1383           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1384             :                  PQresStatus(PQresultStatus(res)));
    1385           2 :     if (PQgetResult(conn) != NULL)
    1386           0 :         pg_fatal("expected NULL result");
    1387             : 
    1388           2 :     if (PQsendQueryParams(conn, "SELECT 1",
    1389             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1390           0 :         pg_fatal("failed to send query: %s",
    1391             :                  PQerrorMessage(conn));
    1392           2 :     if (PQsendFlushRequest(conn) != 1)
    1393           0 :         pg_fatal("failed to send flush request");
    1394           2 :     res = PQgetResult(conn);
    1395           2 :     if (res == NULL)
    1396           0 :         pg_fatal("unexpected NULL");
    1397           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1398           0 :         pg_fatal("Expected PGRES_TUPLES_OK, got %s",
    1399             :                  PQresStatus(PQresultStatus(res)));
    1400           2 :     if (PQgetResult(conn) != NULL)
    1401           0 :         pg_fatal("expected NULL result");
    1402             : 
    1403           2 :     if (PQexitPipelineMode(conn) != 1)
    1404           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1405             : 
    1406           2 :     fprintf(stderr, "ok\n");
    1407           2 : }
    1408             : 
    1409             : /*
    1410             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1411             :  * an error, ignoring transaction commands.
    1412             :  */
    1413             : static void
    1414           2 : test_transaction(PGconn *conn)
    1415             : {
    1416             :     PGresult   *res;
    1417             :     bool        expect_null;
    1418           2 :     int         num_syncs = 0;
    1419             : 
    1420           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1421             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1422           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1423           0 :         pg_fatal("failed to create test table: %s",
    1424             :                  PQerrorMessage(conn));
    1425           2 :     PQclear(res);
    1426             : 
    1427           2 :     if (PQenterPipelineMode(conn) != 1)
    1428           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1429             :                  PQerrorMessage(conn));
    1430           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1431           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1432             :                  PQerrorMessage(conn));
    1433             : 
    1434           2 :     if (PQsendQueryParams(conn,
    1435             :                           "BEGIN",
    1436             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1437           0 :         pg_fatal("failed to send query: %s",
    1438             :                  PQerrorMessage(conn));
    1439           2 :     if (PQsendQueryParams(conn,
    1440             :                           "SELECT 0/0",
    1441             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1442           0 :         pg_fatal("failed to send query: %s",
    1443             :                  PQerrorMessage(conn));
    1444             : 
    1445             :     /*
    1446             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1447             :      * get out of the pipeline-aborted state first.
    1448             :      */
    1449           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1450           0 :         pg_fatal("failed to execute prepared: %s",
    1451             :                  PQerrorMessage(conn));
    1452             : 
    1453             :     /* This insert fails because we're in pipeline-aborted state */
    1454           2 :     if (PQsendQueryParams(conn,
    1455             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1456             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1457           0 :         pg_fatal("failed to send query: %s",
    1458             :                  PQerrorMessage(conn));
    1459           2 :     if (PQpipelineSync(conn) != 1)
    1460           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1461           2 :     num_syncs++;
    1462             : 
    1463             :     /*
    1464             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1465             :      * an aborted transaction
    1466             :      */
    1467           2 :     if (PQsendQueryParams(conn,
    1468             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1469             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1470           0 :         pg_fatal("failed to send query: %s",
    1471             :                  PQerrorMessage(conn));
    1472           2 :     if (PQpipelineSync(conn) != 1)
    1473           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1474           2 :     num_syncs++;
    1475             : 
    1476             :     /*
    1477             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1478             :      * PQpipelineSync above.
    1479             :      */
    1480           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1481           0 :         pg_fatal("failed to execute prepared: %s",
    1482             :                  PQerrorMessage(conn));
    1483             : 
    1484             :     /*
    1485             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1486             :      * insert works
    1487             :      */
    1488           2 :     if (PQsendQueryParams(conn,
    1489             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1490             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1491           0 :         pg_fatal("failed to send query: %s",
    1492             :                  PQerrorMessage(conn));
    1493             :     /* Send two syncs now -- match up to SYNC messages below */
    1494           2 :     if (PQpipelineSync(conn) != 1)
    1495           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1496           2 :     num_syncs++;
    1497           2 :     if (PQpipelineSync(conn) != 1)
    1498           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1499           2 :     num_syncs++;
    1500             : 
    1501           2 :     expect_null = false;
    1502           2 :     for (int i = 0;; i++)
    1503          38 :     {
    1504             :         ExecStatusType restype;
    1505             : 
    1506          40 :         res = PQgetResult(conn);
    1507          40 :         if (res == NULL)
    1508             :         {
    1509          16 :             printf("%d: got NULL result\n", i);
    1510          16 :             if (!expect_null)
    1511           0 :                 pg_fatal("did not expect NULL here");
    1512          16 :             expect_null = false;
    1513          16 :             continue;
    1514             :         }
    1515          24 :         restype = PQresultStatus(res);
    1516          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1517          24 :         if (expect_null)
    1518           0 :             pg_fatal("expected NULL");
    1519          24 :         if (restype == PGRES_FATAL_ERROR)
    1520           4 :             printf("; error: %s", PQerrorMessage(conn));
    1521          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1522             :         {
    1523           4 :             printf(": command didn't run because pipeline aborted\n");
    1524             :         }
    1525             :         else
    1526          16 :             printf("\n");
    1527          24 :         PQclear(res);
    1528             : 
    1529          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1530           8 :             num_syncs--;
    1531             :         else
    1532          16 :             expect_null = true;
    1533          24 :         if (num_syncs <= 0)
    1534           2 :             break;
    1535             :     }
    1536           2 :     if (PQgetResult(conn) != NULL)
    1537           0 :         pg_fatal("returned something extra after all the syncs: %s",
    1538             :                  PQresStatus(PQresultStatus(res)));
    1539             : 
    1540           2 :     if (PQexitPipelineMode(conn) != 1)
    1541           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1542             : 
    1543             :     /* We expect to find one tuple containing the value "3" */
    1544           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1545           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1546           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1547           2 :     if (PQntuples(res) != 1)
    1548           0 :         pg_fatal("did not get 1 tuple");
    1549           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1550           0 :         pg_fatal("did not get expected tuple");
    1551           2 :     PQclear(res);
    1552             : 
    1553           2 :     fprintf(stderr, "ok\n");
    1554           2 : }
    1555             : 
    1556             : /*
    1557             :  * In this test mode we send a stream of queries, with one in the middle
    1558             :  * causing an error.  Verify that we can still send some more after the
    1559             :  * error and have libpq work properly.
    1560             :  */
    1561             : static void
    1562           2 : test_uniqviol(PGconn *conn)
    1563             : {
    1564           2 :     int         sock = PQsocket(conn);
    1565             :     PGresult   *res;
    1566           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    1567             :     const char *paramValues[2];
    1568             :     char        paramValue0[MAXINT8LEN];
    1569             :     char        paramValue1[MAXINT8LEN];
    1570           2 :     int         ctr = 0;
    1571           2 :     int         numsent = 0;
    1572           2 :     int         results = 0;
    1573           2 :     bool        read_done = false;
    1574           2 :     bool        write_done = false;
    1575           2 :     bool        error_sent = false;
    1576           2 :     bool        got_error = false;
    1577           2 :     int         switched = 0;
    1578           2 :     int         socketful = 0;
    1579             :     fd_set      in_fds;
    1580             :     fd_set      out_fds;
    1581             : 
    1582           2 :     fprintf(stderr, "uniqviol ...");
    1583             : 
    1584           2 :     PQsetnonblocking(conn, 1);
    1585             : 
    1586           2 :     paramValues[0] = paramValue0;
    1587           2 :     paramValues[1] = paramValue1;
    1588           2 :     sprintf(paramValue1, "42");
    1589             : 
    1590           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1591             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1592           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1593           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1594             : 
    1595           2 :     res = PQexec(conn, "begin");
    1596           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1597           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1598             : 
    1599           2 :     res = PQprepare(conn, "insertion",
    1600             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    1601             :                     2, paramTypes);
    1602           2 :     if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
    1603           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1604             : 
    1605           2 :     if (PQenterPipelineMode(conn) != 1)
    1606           0 :         pg_fatal("failed to enter pipeline mode");
    1607             : 
    1608          16 :     while (!read_done)
    1609             :     {
    1610             :         /*
    1611             :          * Avoid deadlocks by reading everything the server has sent before
    1612             :          * sending anything.  (Special precaution is needed here to process
    1613             :          * PQisBusy before testing the socket for read-readiness, because the
    1614             :          * socket does not turn read-ready after "sending" queries in aborted
    1615             :          * pipeline mode.)
    1616             :          */
    1617        1212 :         while (PQisBusy(conn) == 0)
    1618             :         {
    1619             :             bool        new_error;
    1620             : 
    1621        1202 :             if (results >= numsent)
    1622             :             {
    1623           2 :                 if (write_done)
    1624           0 :                     read_done = true;
    1625           2 :                 break;
    1626             :             }
    1627             : 
    1628        1200 :             res = PQgetResult(conn);
    1629        1200 :             new_error = process_result(conn, res, results, numsent);
    1630        1200 :             if (new_error && got_error)
    1631           0 :                 pg_fatal("got two errors");
    1632        1200 :             got_error |= new_error;
    1633        1200 :             if (results++ >= numsent - 1)
    1634             :             {
    1635           4 :                 if (write_done)
    1636           2 :                     read_done = true;
    1637           4 :                 break;
    1638             :             }
    1639             :         }
    1640             : 
    1641          16 :         if (read_done)
    1642           2 :             break;
    1643             : 
    1644          14 :         FD_ZERO(&out_fds);
    1645          14 :         FD_SET(sock, &out_fds);
    1646             : 
    1647          14 :         FD_ZERO(&in_fds);
    1648          14 :         FD_SET(sock, &in_fds);
    1649             : 
    1650          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    1651             :         {
    1652           0 :             if (errno == EINTR)
    1653           0 :                 continue;
    1654           0 :             pg_fatal("select() failed: %m");
    1655             :         }
    1656             : 
    1657          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    1658           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    1659             : 
    1660             :         /*
    1661             :          * If the socket is writable and we haven't finished sending queries,
    1662             :          * send some.
    1663             :          */
    1664          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    1665             :         {
    1666             :             for (;;)
    1667        1194 :             {
    1668             :                 int         flush;
    1669             : 
    1670             :                 /*
    1671             :                  * provoke uniqueness violation exactly once after having
    1672             :                  * switched to read mode.
    1673             :                  */
    1674        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    1675             :                 {
    1676           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    1677           2 :                     fprintf(stderr, "E");
    1678           2 :                     error_sent = true;
    1679             :                 }
    1680             :                 else
    1681             :                 {
    1682        1198 :                     fprintf(stderr, ".");
    1683        1198 :                     sprintf(paramValue0, "%d", ctr++);
    1684             :                 }
    1685             : 
    1686        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    1687           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    1688        1200 :                 numsent++;
    1689             : 
    1690             :                 /* Are we done writing? */
    1691        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    1692             :                 {
    1693           2 :                     if (PQsendFlushRequest(conn) != 1)
    1694           0 :                         pg_fatal("failed to send flush request");
    1695           2 :                     write_done = true;
    1696           2 :                     fprintf(stderr, "\ndone writing\n");
    1697           2 :                     PQflush(conn);
    1698           2 :                     break;
    1699             :                 }
    1700             : 
    1701             :                 /* is the outgoing socket full? */
    1702        1198 :                 flush = PQflush(conn);
    1703        1198 :                 if (flush == -1)
    1704           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    1705        1198 :                 if (flush == 1)
    1706             :                 {
    1707           4 :                     if (socketful == 0)
    1708           2 :                         socketful = numsent;
    1709           4 :                     fprintf(stderr, "\nswitch to reading\n");
    1710           4 :                     switched++;
    1711           4 :                     break;
    1712             :                 }
    1713             :             }
    1714             :         }
    1715             :     }
    1716             : 
    1717           2 :     if (!got_error)
    1718           0 :         pg_fatal("did not get expected error");
    1719             : 
    1720           2 :     fprintf(stderr, "ok\n");
    1721           2 : }
    1722             : 
    1723             : /*
    1724             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    1725             :  * the expected NULL that should follow it.
    1726             :  *
    1727             :  * Returns true if we read a fatal error message, otherwise false.
    1728             :  */
    1729             : static bool
    1730        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    1731             : {
    1732             :     PGresult   *res2;
    1733        1200 :     bool        got_error = false;
    1734             : 
    1735        1200 :     if (res == NULL)
    1736           0 :         pg_fatal("got unexpected NULL");
    1737             : 
    1738        1200 :     switch (PQresultStatus(res))
    1739             :     {
    1740           2 :         case PGRES_FATAL_ERROR:
    1741           2 :             got_error = true;
    1742           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    1743           2 :             PQclear(res);
    1744             : 
    1745           2 :             res2 = PQgetResult(conn);
    1746           2 :             if (res2 != NULL)
    1747           0 :                 pg_fatal("expected NULL, got %s",
    1748             :                          PQresStatus(PQresultStatus(res2)));
    1749           2 :             break;
    1750             : 
    1751         836 :         case PGRES_TUPLES_OK:
    1752         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    1753         836 :             PQclear(res);
    1754             : 
    1755         836 :             res2 = PQgetResult(conn);
    1756         836 :             if (res2 != NULL)
    1757           0 :                 pg_fatal("expected NULL, got %s",
    1758             :                          PQresStatus(PQresultStatus(res2)));
    1759         836 :             break;
    1760             : 
    1761         362 :         case PGRES_PIPELINE_ABORTED:
    1762         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    1763         362 :             res2 = PQgetResult(conn);
    1764         362 :             if (res2 != NULL)
    1765           0 :                 pg_fatal("expected NULL, got %s",
    1766             :                          PQresStatus(PQresultStatus(res2)));
    1767         362 :             break;
    1768             : 
    1769           0 :         default:
    1770           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    1771             :     }
    1772             : 
    1773        1200 :     return got_error;
    1774             : }
    1775             : 
    1776             : 
    1777             : static void
    1778           0 : usage(const char *progname)
    1779             : {
    1780           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    1781           0 :     fprintf(stderr, "Usage:\n");
    1782           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    1783           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    1784           0 :     fprintf(stderr, "\nOptions:\n");
    1785           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    1786           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    1787           0 : }
    1788             : 
    1789             : static void
    1790           2 : print_test_list(void)
    1791             : {
    1792           2 :     printf("disallowed_in_pipeline\n");
    1793           2 :     printf("multi_pipelines\n");
    1794           2 :     printf("nosync\n");
    1795           2 :     printf("pipeline_abort\n");
    1796           2 :     printf("pipeline_idle\n");
    1797           2 :     printf("pipelined_insert\n");
    1798           2 :     printf("prepared\n");
    1799           2 :     printf("simple_pipeline\n");
    1800           2 :     printf("singlerow\n");
    1801           2 :     printf("transaction\n");
    1802           2 :     printf("uniqviol\n");
    1803           2 : }
    1804             : 
    1805             : int
    1806          24 : main(int argc, char **argv)
    1807             : {
    1808          24 :     const char *conninfo = "";
    1809             :     PGconn     *conn;
    1810             :     FILE       *trace;
    1811             :     char       *testname;
    1812          24 :     int         numrows = 10000;
    1813             :     PGresult   *res;
    1814             :     int         c;
    1815             : 
    1816          88 :     while ((c = getopt(argc, argv, "r:t:")) != -1)
    1817             :     {
    1818          40 :         switch (c)
    1819             :         {
    1820          22 :             case 'r':           /* numrows */
    1821          22 :                 errno = 0;
    1822          22 :                 numrows = strtol(optarg, NULL, 10);
    1823          22 :                 if (errno != 0 || numrows <= 0)
    1824             :                 {
    1825           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    1826             :                             optarg);
    1827           0 :                     exit(1);
    1828             :                 }
    1829          22 :                 break;
    1830          18 :             case 't':           /* trace file */
    1831          18 :                 tracefile = pg_strdup(optarg);
    1832          18 :                 break;
    1833             :         }
    1834          64 :     }
    1835             : 
    1836          24 :     if (optind < argc)
    1837             :     {
    1838          24 :         testname = pg_strdup(argv[optind]);
    1839          24 :         optind++;
    1840             :     }
    1841             :     else
    1842             :     {
    1843           0 :         usage(argv[0]);
    1844           0 :         exit(1);
    1845             :     }
    1846             : 
    1847          24 :     if (strcmp(testname, "tests") == 0)
    1848             :     {
    1849           2 :         print_test_list();
    1850           2 :         exit(0);
    1851             :     }
    1852             : 
    1853          22 :     if (optind < argc)
    1854             :     {
    1855          22 :         conninfo = pg_strdup(argv[optind]);
    1856          22 :         optind++;
    1857             :     }
    1858             : 
    1859             :     /* Make a connection to the database */
    1860          22 :     conn = PQconnectdb(conninfo);
    1861          22 :     if (PQstatus(conn) != CONNECTION_OK)
    1862             :     {
    1863           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    1864             :                 PQerrorMessage(conn));
    1865           0 :         exit_nicely(conn);
    1866             :     }
    1867             : 
    1868          22 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    1869          22 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1870           0 :         pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
    1871          22 :     res = PQexec(conn, "SET debug_parallel_query = off");
    1872          22 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1873           0 :         pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
    1874             : 
    1875             :     /* Set the trace file, if requested */
    1876          22 :     if (tracefile != NULL)
    1877             :     {
    1878          18 :         if (strcmp(tracefile, "-") == 0)
    1879           0 :             trace = stdout;
    1880             :         else
    1881          18 :             trace = fopen(tracefile, "w");
    1882          18 :         if (trace == NULL)
    1883           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    1884             : 
    1885             :         /* Make it line-buffered */
    1886          18 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    1887             : 
    1888          18 :         PQtrace(conn, trace);
    1889          18 :         PQsetTraceFlags(conn,
    1890             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    1891             :     }
    1892             : 
    1893          22 :     if (strcmp(testname, "disallowed_in_pipeline") == 0)
    1894           2 :         test_disallowed_in_pipeline(conn);
    1895          20 :     else if (strcmp(testname, "multi_pipelines") == 0)
    1896           2 :         test_multi_pipelines(conn);
    1897          18 :     else if (strcmp(testname, "nosync") == 0)
    1898           2 :         test_nosync(conn);
    1899          16 :     else if (strcmp(testname, "pipeline_abort") == 0)
    1900           2 :         test_pipeline_abort(conn);
    1901          14 :     else if (strcmp(testname, "pipeline_idle") == 0)
    1902           2 :         test_pipeline_idle(conn);
    1903          12 :     else if (strcmp(testname, "pipelined_insert") == 0)
    1904           2 :         test_pipelined_insert(conn, numrows);
    1905          10 :     else if (strcmp(testname, "prepared") == 0)
    1906           2 :         test_prepared(conn);
    1907           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    1908           2 :         test_simple_pipeline(conn);
    1909           6 :     else if (strcmp(testname, "singlerow") == 0)
    1910           2 :         test_singlerowmode(conn);
    1911           4 :     else if (strcmp(testname, "transaction") == 0)
    1912           2 :         test_transaction(conn);
    1913           2 :     else if (strcmp(testname, "uniqviol") == 0)
    1914           2 :         test_uniqviol(conn);
    1915             :     else
    1916             :     {
    1917           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    1918           0 :         exit(1);
    1919             :     }
    1920             : 
    1921             :     /* close the connection to the database and cleanup */
    1922          22 :     PQfinish(conn);
    1923          22 :     return 0;
    1924             : }

Generated by: LCOV version 1.14