LCOV - code coverage report
Current view: top level - src/test/modules/libpq_pipeline - libpq_pipeline.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 621 853 72.8 %
Date: 2021-11-29 04:09:17 Functions: 13 16 81.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq_pipeline.c
       4             :  *      Verify libpq pipeline execution functionality
       5             :  *
       6             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       7             :  * Portions Copyright (c) 1994, Regents of the University of California
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/test/modules/libpq_pipeline/libpq_pipeline.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : #include "postgres_fe.h"
      17             : 
      18             : #include <sys/time.h>
      19             : #ifdef HAVE_SYS_SELECT_H
      20             : #include <sys/select.h>
      21             : #endif
      22             : 
      23             : #include "catalog/pg_type_d.h"
      24             : #include "common/fe_memutils.h"
      25             : #include "libpq-fe.h"
      26             : #include "pg_getopt.h"
      27             : #include "portability/instr_time.h"
      28             : 
      29             : 
      30             : static void exit_nicely(PGconn *conn);
      31             : static bool process_result(PGconn *conn, PGresult *res, int results,
      32             :                            int numsent);
      33             : 
      34             : const char *const progname = "libpq_pipeline";
      35             : 
      36             : /* Options and defaults */
      37             : char       *tracefile = NULL;   /* path to PQtrace() file */
      38             : 
      39             : 
      40             : #ifdef DEBUG_OUTPUT
      41             : #define pg_debug(...)  do { fprintf(stderr, __VA_ARGS__); } while (0)
      42             : #else
      43             : #define pg_debug(...)
      44             : #endif
      45             : 
      46             : static const char *const drop_table_sql =
      47             : "DROP TABLE IF EXISTS pq_pipeline_demo";
      48             : static const char *const create_table_sql =
      49             : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
      50             : "int8filler int8);";
      51             : static const char *const insert_sql =
      52             : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
      53             : static const char *const insert_sql2 =
      54             : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
      55             : 
      56             : /* max char length of an int32/64, plus sign and null terminator */
      57             : #define MAXINTLEN 12
      58             : #define MAXINT8LEN 20
      59             : 
      60             : static void
      61           0 : exit_nicely(PGconn *conn)
      62             : {
      63           0 :     PQfinish(conn);
      64           0 :     exit(1);
      65             : }
      66             : 
      67             : /*
      68             :  * Print an error to stderr and terminate the program.
      69             :  */
      70             : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
      71             : static void
      72             : pg_attribute_noreturn()
      73           0 : pg_fatal_impl(int line, const char *fmt,...)
      74             : {
      75             :     va_list     args;
      76             : 
      77             : 
      78           0 :     fflush(stdout);
      79             : 
      80           0 :     fprintf(stderr, "\n%s:%d: ", progname, line);
      81           0 :     va_start(args, fmt);
      82           0 :     vfprintf(stderr, fmt, args);
      83           0 :     va_end(args);
      84             :     Assert(fmt[strlen(fmt) - 1] != '\n');
      85           0 :     fprintf(stderr, "\n");
      86           0 :     exit(1);
      87             : }
      88             : 
      89             : static void
      90           2 : test_disallowed_in_pipeline(PGconn *conn)
      91             : {
      92           2 :     PGresult   *res = NULL;
      93             : 
      94           2 :     fprintf(stderr, "test error cases... ");
      95             : 
      96           2 :     if (PQisnonblocking(conn))
      97           0 :         pg_fatal("Expected blocking connection mode");
      98             : 
      99           2 :     if (PQenterPipelineMode(conn) != 1)
     100           0 :         pg_fatal("Unable to enter pipeline mode");
     101             : 
     102           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     103           0 :         pg_fatal("Pipeline mode not activated properly");
     104             : 
     105             :     /* PQexec should fail in pipeline mode */
     106           2 :     res = PQexec(conn, "SELECT 1");
     107           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     108           0 :         pg_fatal("PQexec should fail in pipeline mode but succeeded");
     109             : 
     110             :     /* Entering pipeline mode when already in pipeline mode is OK */
     111           2 :     if (PQenterPipelineMode(conn) != 1)
     112           0 :         pg_fatal("re-entering pipeline mode should be a no-op but failed");
     113             : 
     114           2 :     if (PQisBusy(conn) != 0)
     115           0 :         pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
     116             : 
     117             :     /* ok, back to normal command mode */
     118           2 :     if (PQexitPipelineMode(conn) != 1)
     119           0 :         pg_fatal("couldn't exit idle empty pipeline mode");
     120             : 
     121           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     122           0 :         pg_fatal("Pipeline mode not terminated properly");
     123             : 
     124             :     /* exiting pipeline mode when not in pipeline mode should be a no-op */
     125           2 :     if (PQexitPipelineMode(conn) != 1)
     126           0 :         pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
     127             : 
     128             :     /* can now PQexec again */
     129           2 :     res = PQexec(conn, "SELECT 1");
     130           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     131           0 :         pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
     132             :                  PQerrorMessage(conn));
     133             : 
     134           2 :     fprintf(stderr, "ok\n");
     135           2 : }
     136             : 
     137             : static void
     138           2 : test_multi_pipelines(PGconn *conn)
     139             : {
     140           2 :     PGresult   *res = NULL;
     141           2 :     const char *dummy_params[1] = {"1"};
     142           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     143             : 
     144           2 :     fprintf(stderr, "multi pipeline... ");
     145             : 
     146             :     /*
     147             :      * Queue up a couple of small pipelines and process each without returning
     148             :      * to command mode first.
     149             :      */
     150           2 :     if (PQenterPipelineMode(conn) != 1)
     151           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     152             : 
     153           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     154             :                           dummy_params, NULL, NULL, 0) != 1)
     155           0 :         pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
     156             : 
     157           2 :     if (PQpipelineSync(conn) != 1)
     158           0 :         pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
     159             : 
     160           2 :     if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
     161             :                           dummy_params, NULL, NULL, 0) != 1)
     162           0 :         pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
     163             : 
     164           2 :     if (PQpipelineSync(conn) != 1)
     165           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     166             : 
     167             :     /* OK, start processing the results */
     168           2 :     res = PQgetResult(conn);
     169           2 :     if (res == NULL)
     170           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     171             :                  PQerrorMessage(conn));
     172             : 
     173           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     174           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
     175             :                  PQresStatus(PQresultStatus(res)));
     176           2 :     PQclear(res);
     177           2 :     res = NULL;
     178             : 
     179           2 :     if (PQgetResult(conn) != NULL)
     180           0 :         pg_fatal("PQgetResult returned something extra after first result");
     181             : 
     182           2 :     if (PQexitPipelineMode(conn) != 0)
     183           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
     184             : 
     185           2 :     res = PQgetResult(conn);
     186           2 :     if (res == NULL)
     187           0 :         pg_fatal("PQgetResult returned null when sync result expected: %s",
     188             :                  PQerrorMessage(conn));
     189             : 
     190           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     191           0 :         pg_fatal("Unexpected result code %s instead of sync result, error: %s",
     192             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     193           2 :     PQclear(res);
     194             : 
     195             :     /* second pipeline */
     196             : 
     197           2 :     res = PQgetResult(conn);
     198           2 :     if (res == NULL)
     199           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     200             :                  PQerrorMessage(conn));
     201             : 
     202           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     203           0 :         pg_fatal("Unexpected result code %s from second pipeline item",
     204             :                  PQresStatus(PQresultStatus(res)));
     205             : 
     206           2 :     res = PQgetResult(conn);
     207           2 :     if (res != NULL)
     208           0 :         pg_fatal("Expected null result, got %s",
     209             :                  PQresStatus(PQresultStatus(res)));
     210             : 
     211           2 :     res = PQgetResult(conn);
     212           2 :     if (res == NULL)
     213           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
     214             :                  PQerrorMessage(conn));
     215             : 
     216           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     217           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     218             :                  PQresStatus(PQresultStatus(res)));
     219             : 
     220             :     /* We're still in pipeline mode ... */
     221           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     222           0 :         pg_fatal("Fell out of pipeline mode somehow");
     223             : 
     224             :     /* until we end it, which we can safely do now */
     225           2 :     if (PQexitPipelineMode(conn) != 1)
     226           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     227             :                  PQerrorMessage(conn));
     228             : 
     229           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     230           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     231             : 
     232           2 :     fprintf(stderr, "ok\n");
     233           2 : }
     234             : 
     235             : /*
     236             :  * Test behavior when a pipeline dispatches a number of commands that are
     237             :  * not flushed by a sync point.
     238             :  */
     239             : static void
     240           2 : test_nosync(PGconn *conn)
     241             : {
     242           2 :     int         numqueries = 10;
     243           2 :     int         results = 0;
     244           2 :     int         sock = PQsocket(conn);
     245             : 
     246           2 :     fprintf(stderr, "nosync... ");
     247             : 
     248           2 :     if (sock < 0)
     249           0 :         pg_fatal("invalid socket");
     250             : 
     251           2 :     if (PQenterPipelineMode(conn) != 1)
     252           0 :         pg_fatal("could not enter pipeline mode");
     253          22 :     for (int i = 0; i < numqueries; i++)
     254             :     {
     255             :         fd_set      input_mask;
     256             :         struct timeval tv;
     257             : 
     258          20 :         if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
     259             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     260           0 :             pg_fatal("error sending select: %s", PQerrorMessage(conn));
     261          20 :         PQflush(conn);
     262             : 
     263             :         /*
     264             :          * If the server has written anything to us, read (some of) it now.
     265             :          */
     266          20 :         FD_ZERO(&input_mask);
     267          20 :         FD_SET(sock, &input_mask);
     268          20 :         tv.tv_sec = 0;
     269          20 :         tv.tv_usec = 0;
     270          20 :         if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
     271             :         {
     272           0 :             fprintf(stderr, "select() failed: %s\n", strerror(errno));
     273           0 :             exit_nicely(conn);
     274             :         }
     275          20 :         if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
     276           0 :             pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
     277             :     }
     278             : 
     279             :     /* tell server to flush its output buffer */
     280           2 :     if (PQsendFlushRequest(conn) != 1)
     281           0 :         pg_fatal("failed to send flush request");
     282           2 :     PQflush(conn);
     283             : 
     284             :     /* Now read all results */
     285             :     for (;;)
     286          18 :     {
     287             :         PGresult   *res;
     288             : 
     289          20 :         res = PQgetResult(conn);
     290             : 
     291             :         /* NULL results are only expected after TUPLES_OK */
     292          20 :         if (res == NULL)
     293           0 :             pg_fatal("got unexpected NULL result after %d results", results);
     294             : 
     295             :         /* We expect exactly one TUPLES_OK result for each query we sent */
     296          20 :         if (PQresultStatus(res) == PGRES_TUPLES_OK)
     297             :         {
     298             :             PGresult   *res2;
     299             : 
     300             :             /* and one NULL result should follow each */
     301          20 :             res2 = PQgetResult(conn);
     302          20 :             if (res2 != NULL)
     303           0 :                 pg_fatal("expected NULL, got %s",
     304             :                          PQresStatus(PQresultStatus(res2)));
     305          20 :             PQclear(res);
     306          20 :             results++;
     307             : 
     308             :             /* if we're done, we're done */
     309          20 :             if (results == numqueries)
     310           2 :                 break;
     311             : 
     312          18 :             continue;
     313             :         }
     314             : 
     315             :         /* anything else is unexpected */
     316           0 :         pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
     317             :     }
     318             : 
     319           2 :     fprintf(stderr, "ok\n");
     320           2 : }
     321             : 
     322             : /*
     323             :  * When an operation in a pipeline fails the rest of the pipeline is flushed. We
     324             :  * still have to get results for each pipeline item, but the item will just be
     325             :  * a PGRES_PIPELINE_ABORTED code.
     326             :  *
     327             :  * This intentionally doesn't use a transaction to wrap the pipeline. You should
     328             :  * usually use an xact, but in this case we want to observe the effects of each
     329             :  * statement.
     330             :  */
     331             : static void
     332           2 : test_pipeline_abort(PGconn *conn)
     333             : {
     334           2 :     PGresult   *res = NULL;
     335           2 :     const char *dummy_params[1] = {"1"};
     336           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     337             :     int         i;
     338             :     int         gotrows;
     339             :     bool        goterror;
     340             : 
     341           2 :     fprintf(stderr, "aborted pipeline... ");
     342             : 
     343           2 :     res = PQexec(conn, drop_table_sql);
     344           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     345           0 :         pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
     346             : 
     347           2 :     res = PQexec(conn, create_table_sql);
     348           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     349           0 :         pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
     350             : 
     351             :     /*
     352             :      * Queue up a couple of small pipelines and process each without returning
     353             :      * to command mode first. Make sure the second operation in the first
     354             :      * pipeline ERRORs.
     355             :      */
     356           2 :     if (PQenterPipelineMode(conn) != 1)
     357           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     358             : 
     359           2 :     dummy_params[0] = "1";
     360           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     361             :                           dummy_params, NULL, NULL, 0) != 1)
     362           0 :         pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
     363             : 
     364           2 :     if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
     365             :                           1, dummy_param_oids, dummy_params,
     366             :                           NULL, NULL, 0) != 1)
     367           0 :         pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
     368             : 
     369           2 :     dummy_params[0] = "2";
     370           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     371             :                           dummy_params, NULL, NULL, 0) != 1)
     372           0 :         pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
     373             : 
     374           2 :     if (PQpipelineSync(conn) != 1)
     375           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     376             : 
     377           2 :     dummy_params[0] = "3";
     378           2 :     if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
     379             :                           dummy_params, NULL, NULL, 0) != 1)
     380           0 :         pg_fatal("dispatching second-pipeline insert failed: %s",
     381             :                  PQerrorMessage(conn));
     382             : 
     383           2 :     if (PQpipelineSync(conn) != 1)
     384           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     385             : 
     386             :     /*
     387             :      * OK, start processing the pipeline results.
     388             :      *
     389             :      * We should get a command-ok for the first query, then a fatal error and
     390             :      * a pipeline aborted message for the second insert, a pipeline-end, then
     391             :      * a command-ok and a pipeline-ok for the second pipeline operation.
     392             :      */
     393           2 :     res = PQgetResult(conn);
     394           2 :     if (res == NULL)
     395           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     396           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     397           0 :         pg_fatal("Unexpected result status %s: %s",
     398             :                  PQresStatus(PQresultStatus(res)),
     399             :                  PQresultErrorMessage(res));
     400           2 :     PQclear(res);
     401             : 
     402             :     /* NULL result to signal end-of-results for this command */
     403           2 :     if ((res = PQgetResult(conn)) != NULL)
     404           0 :         pg_fatal("Expected null result, got %s",
     405             :                  PQresStatus(PQresultStatus(res)));
     406             : 
     407             :     /* Second query caused error, so we expect an error next */
     408           2 :     res = PQgetResult(conn);
     409           2 :     if (res == NULL)
     410           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     411           2 :     if (PQresultStatus(res) != PGRES_FATAL_ERROR)
     412           0 :         pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
     413             :                  PQresStatus(PQresultStatus(res)));
     414           2 :     PQclear(res);
     415             : 
     416             :     /* NULL result to signal end-of-results for this command */
     417           2 :     if ((res = PQgetResult(conn)) != NULL)
     418           0 :         pg_fatal("Expected null result, got %s",
     419             :                  PQresStatus(PQresultStatus(res)));
     420             : 
     421             :     /*
     422             :      * pipeline should now be aborted.
     423             :      *
     424             :      * Note that we could still queue more queries at this point if we wanted;
     425             :      * they'd get added to a new third pipeline since we've already sent a
     426             :      * second. The aborted flag relates only to the pipeline being received.
     427             :      */
     428           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     429           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     430             : 
     431             :     /* third query in pipeline, the second insert */
     432           2 :     res = PQgetResult(conn);
     433           2 :     if (res == NULL)
     434           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     435           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
     436           0 :         pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
     437             :                  PQresStatus(PQresultStatus(res)));
     438           2 :     PQclear(res);
     439             : 
     440             :     /* NULL result to signal end-of-results for this command */
     441           2 :     if ((res = PQgetResult(conn)) != NULL)
     442           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     443             : 
     444           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
     445           0 :         pg_fatal("pipeline should be flagged as aborted but isn't");
     446             : 
     447             :     /* Ensure we're still in pipeline */
     448           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     449           0 :         pg_fatal("Fell out of pipeline mode somehow");
     450             : 
     451             :     /*
     452             :      * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
     453             :      *
     454             :      * (This is so clients know to start processing results normally again and
     455             :      * can tell the difference between skipped commands and the sync.)
     456             :      */
     457           2 :     res = PQgetResult(conn);
     458           2 :     if (res == NULL)
     459           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     460           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     461           0 :         pg_fatal("Unexpected result code from first pipeline sync\n"
     462             :                  "Expected PGRES_PIPELINE_SYNC, got %s",
     463             :                  PQresStatus(PQresultStatus(res)));
     464           2 :     PQclear(res);
     465             : 
     466           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
     467           0 :         pg_fatal("sync should've cleared the aborted flag but didn't");
     468             : 
     469             :     /* We're still in pipeline mode... */
     470           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     471           0 :         pg_fatal("Fell out of pipeline mode somehow");
     472             : 
     473             :     /* the insert from the second pipeline */
     474           2 :     res = PQgetResult(conn);
     475           2 :     if (res == NULL)
     476           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     477           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     478           0 :         pg_fatal("Unexpected result code %s from first item in second pipeline",
     479             :                  PQresStatus(PQresultStatus(res)));
     480           2 :     PQclear(res);
     481             : 
     482             :     /* Read the NULL result at the end of the command */
     483           2 :     if ((res = PQgetResult(conn)) != NULL)
     484           0 :         pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
     485             : 
     486             :     /* the second pipeline sync */
     487           2 :     if ((res = PQgetResult(conn)) == NULL)
     488           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     489           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     490           0 :         pg_fatal("Unexpected result code %s from second pipeline sync",
     491             :                  PQresStatus(PQresultStatus(res)));
     492           2 :     PQclear(res);
     493             : 
     494           2 :     if ((res = PQgetResult(conn)) != NULL)
     495           0 :         pg_fatal("Expected null result, got %s: %s",
     496             :                  PQresStatus(PQresultStatus(res)),
     497             :                  PQerrorMessage(conn));
     498             : 
     499             :     /* Try to send two queries in one command */
     500           2 :     if (PQsendQuery(conn, "SELECT 1; SELECT 2") != 1)
     501           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     502           2 :     if (PQpipelineSync(conn) != 1)
     503           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     504           2 :     goterror = false;
     505           4 :     while ((res = PQgetResult(conn)) != NULL)
     506             :     {
     507           2 :         switch (PQresultStatus(res))
     508             :         {
     509           2 :             case PGRES_FATAL_ERROR:
     510           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
     511           0 :                     pg_fatal("expected error about multiple commands, got %s",
     512             :                              PQerrorMessage(conn));
     513           2 :                 printf("got expected %s", PQerrorMessage(conn));
     514           2 :                 goterror = true;
     515           2 :                 break;
     516           0 :             default:
     517           0 :                 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
     518             :                 break;
     519             :         }
     520             :     }
     521           2 :     if (!goterror)
     522           0 :         pg_fatal("did not get cannot-insert-multiple-commands error");
     523           2 :     res = PQgetResult(conn);
     524           2 :     if (res == NULL)
     525           0 :         pg_fatal("got NULL result");
     526           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     527           0 :         pg_fatal("Unexpected result code %s from pipeline sync",
     528             :                  PQresStatus(PQresultStatus(res)));
     529           2 :     fprintf(stderr, "ok\n");
     530             : 
     531             :     /* Test single-row mode with an error partways */
     532           2 :     if (PQsendQuery(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g") != 1)
     533           0 :         pg_fatal("failed to send query: %s", PQerrorMessage(conn));
     534           2 :     if (PQpipelineSync(conn) != 1)
     535           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     536           2 :     PQsetSingleRowMode(conn);
     537           2 :     goterror = false;
     538           2 :     gotrows = 0;
     539          10 :     while ((res = PQgetResult(conn)) != NULL)
     540             :     {
     541           8 :         switch (PQresultStatus(res))
     542             :         {
     543           6 :             case PGRES_SINGLE_TUPLE:
     544           6 :                 printf("got row: %s\n", PQgetvalue(res, 0, 0));
     545           6 :                 gotrows++;
     546           6 :                 break;
     547           2 :             case PGRES_FATAL_ERROR:
     548           2 :                 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
     549           0 :                     pg_fatal("expected division-by-zero, got: %s (%s)",
     550             :                              PQerrorMessage(conn),
     551             :                              PQresultErrorField(res, PG_DIAG_SQLSTATE));
     552           2 :                 printf("got expected division-by-zero\n");
     553           2 :                 goterror = true;
     554           2 :                 break;
     555           0 :             default:
     556           0 :                 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
     557             :         }
     558           8 :         PQclear(res);
     559             :     }
     560           2 :     if (!goterror)
     561           0 :         pg_fatal("did not get division-by-zero error");
     562           2 :     if (gotrows != 3)
     563           0 :         pg_fatal("did not get three rows");
     564             :     /* the third pipeline sync */
     565           2 :     if ((res = PQgetResult(conn)) == NULL)
     566           0 :         pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
     567           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     568           0 :         pg_fatal("Unexpected result code %s from third pipeline sync",
     569             :                  PQresStatus(PQresultStatus(res)));
     570           2 :     PQclear(res);
     571             : 
     572             :     /* We're still in pipeline mode... */
     573           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
     574           0 :         pg_fatal("Fell out of pipeline mode somehow");
     575             : 
     576             :     /* until we end it, which we can safely do now */
     577           2 :     if (PQexitPipelineMode(conn) != 1)
     578           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     579             :                  PQerrorMessage(conn));
     580             : 
     581           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
     582           0 :         pg_fatal("exiting pipeline mode didn't seem to work");
     583             : 
     584           2 :     fprintf(stderr, "ok\n");
     585             : 
     586             :     /*-
     587             :      * Since we fired the pipelines off without a surrounding xact, the results
     588             :      * should be:
     589             :      *
     590             :      * - Implicit xact started by server around 1st pipeline
     591             :      * - First insert applied
     592             :      * - Second statement aborted xact
     593             :      * - Third insert skipped
     594             :      * - Sync rolled back first implicit xact
     595             :      * - Implicit xact created by server around 2nd pipeline
     596             :      * - insert applied from 2nd pipeline
     597             :      * - Sync commits 2nd xact
     598             :      *
     599             :      * So we should only have the value 3 that we inserted.
     600             :      */
     601           2 :     res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
     602             : 
     603           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     604           0 :         pg_fatal("Expected tuples, got %s: %s",
     605             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
     606           2 :     if (PQntuples(res) != 1)
     607           0 :         pg_fatal("expected 1 result, got %d", PQntuples(res));
     608           4 :     for (i = 0; i < PQntuples(res); i++)
     609             :     {
     610           2 :         const char *val = PQgetvalue(res, i, 0);
     611             : 
     612           2 :         if (strcmp(val, "3") != 0)
     613           0 :             pg_fatal("expected only insert with value 3, got %s", val);
     614             :     }
     615             : 
     616           2 :     PQclear(res);
     617           2 : }
     618             : 
     619             : /* State machine enum for test_pipelined_insert */
     620             : enum PipelineInsertStep
     621             : {
     622             :     BI_BEGIN_TX,
     623             :     BI_DROP_TABLE,
     624             :     BI_CREATE_TABLE,
     625             :     BI_PREPARE,
     626             :     BI_INSERT_ROWS,
     627             :     BI_COMMIT_TX,
     628             :     BI_SYNC,
     629             :     BI_DONE
     630             : };
     631             : 
     632             : static void
     633           2 : test_pipelined_insert(PGconn *conn, int n_rows)
     634             : {
     635           2 :     Oid         insert_param_oids[2] = {INT4OID, INT8OID};
     636             :     const char *insert_params[2];
     637             :     char        insert_param_0[MAXINTLEN];
     638             :     char        insert_param_1[MAXINT8LEN];
     639           2 :     enum PipelineInsertStep send_step = BI_BEGIN_TX,
     640           2 :                 recv_step = BI_BEGIN_TX;
     641             :     int         rows_to_send,
     642             :                 rows_to_receive;
     643             : 
     644           2 :     insert_params[0] = insert_param_0;
     645           2 :     insert_params[1] = insert_param_1;
     646             : 
     647           2 :     rows_to_send = rows_to_receive = n_rows;
     648             : 
     649             :     /*
     650             :      * Do a pipelined insert into a table created at the start of the pipeline
     651             :      */
     652           2 :     if (PQenterPipelineMode(conn) != 1)
     653           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     654             : 
     655           8 :     while (send_step != BI_PREPARE)
     656             :     {
     657             :         const char *sql;
     658             : 
     659           6 :         switch (send_step)
     660             :         {
     661           2 :             case BI_BEGIN_TX:
     662           2 :                 sql = "BEGIN TRANSACTION";
     663           2 :                 send_step = BI_DROP_TABLE;
     664           2 :                 break;
     665             : 
     666           2 :             case BI_DROP_TABLE:
     667           2 :                 sql = drop_table_sql;
     668           2 :                 send_step = BI_CREATE_TABLE;
     669           2 :                 break;
     670             : 
     671           2 :             case BI_CREATE_TABLE:
     672           2 :                 sql = create_table_sql;
     673           2 :                 send_step = BI_PREPARE;
     674           2 :                 break;
     675             : 
     676           0 :             default:
     677           0 :                 pg_fatal("invalid state");
     678             :                 sql = NULL;     /* keep compiler quiet */
     679             :         }
     680             : 
     681             :         pg_debug("sending: %s\n", sql);
     682           6 :         if (PQsendQueryParams(conn, sql,
     683             :                               0, NULL, NULL, NULL, NULL, 0) != 1)
     684           0 :             pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
     685             :     }
     686             : 
     687             :     Assert(send_step == BI_PREPARE);
     688             :     pg_debug("sending: %s\n", insert_sql2);
     689           2 :     if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
     690           0 :         pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
     691           2 :     send_step = BI_INSERT_ROWS;
     692             : 
     693             :     /*
     694             :      * Now we start inserting. We'll be sending enough data that we could fill
     695             :      * our output buffer, so to avoid deadlocking we need to enter nonblocking
     696             :      * mode and consume input while we send more output. As results of each
     697             :      * query are processed we should pop them to allow processing of the next
     698             :      * query. There's no need to finish the pipeline before processing
     699             :      * results.
     700             :      */
     701           2 :     if (PQsetnonblocking(conn, 1) != 0)
     702           0 :         pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
     703             : 
     704       36258 :     while (recv_step != BI_DONE)
     705             :     {
     706             :         int         sock;
     707             :         fd_set      input_mask;
     708             :         fd_set      output_mask;
     709             : 
     710       36256 :         sock = PQsocket(conn);
     711             : 
     712       36256 :         if (sock < 0)
     713           0 :             break;              /* shouldn't happen */
     714             : 
     715       36256 :         FD_ZERO(&input_mask);
     716       36256 :         FD_SET(sock, &input_mask);
     717       36256 :         FD_ZERO(&output_mask);
     718       36256 :         FD_SET(sock, &output_mask);
     719             : 
     720       36256 :         if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
     721             :         {
     722           0 :             fprintf(stderr, "select() failed: %s\n", strerror(errno));
     723           0 :             exit_nicely(conn);
     724             :         }
     725             : 
     726             :         /*
     727             :          * Process any results, so we keep the server's output buffer free
     728             :          * flowing and it can continue to process input
     729             :          */
     730       36256 :         if (FD_ISSET(sock, &input_mask))
     731             :         {
     732           4 :             PQconsumeInput(conn);
     733             : 
     734             :             /* Read until we'd block if we tried to read */
     735        2826 :             while (!PQisBusy(conn) && recv_step < BI_DONE)
     736             :             {
     737             :                 PGresult   *res;
     738        2822 :                 const char *cmdtag = "";
     739        2822 :                 const char *description = "";
     740             :                 int         status;
     741             : 
     742             :                 /*
     743             :                  * Read next result.  If no more results from this query,
     744             :                  * advance to the next query
     745             :                  */
     746        2822 :                 res = PQgetResult(conn);
     747        2822 :                 if (res == NULL)
     748        1410 :                     continue;
     749             : 
     750        1412 :                 status = PGRES_COMMAND_OK;
     751        1412 :                 switch (recv_step)
     752             :                 {
     753           2 :                     case BI_BEGIN_TX:
     754           2 :                         cmdtag = "BEGIN";
     755           2 :                         recv_step++;
     756           2 :                         break;
     757           2 :                     case BI_DROP_TABLE:
     758           2 :                         cmdtag = "DROP TABLE";
     759           2 :                         recv_step++;
     760           2 :                         break;
     761           2 :                     case BI_CREATE_TABLE:
     762           2 :                         cmdtag = "CREATE TABLE";
     763           2 :                         recv_step++;
     764           2 :                         break;
     765           2 :                     case BI_PREPARE:
     766           2 :                         cmdtag = "";
     767           2 :                         description = "PREPARE";
     768           2 :                         recv_step++;
     769           2 :                         break;
     770        1400 :                     case BI_INSERT_ROWS:
     771        1400 :                         cmdtag = "INSERT";
     772        1400 :                         rows_to_receive--;
     773        1400 :                         if (rows_to_receive == 0)
     774           2 :                             recv_step++;
     775        1400 :                         break;
     776           2 :                     case BI_COMMIT_TX:
     777           2 :                         cmdtag = "COMMIT";
     778           2 :                         recv_step++;
     779           2 :                         break;
     780           2 :                     case BI_SYNC:
     781           2 :                         cmdtag = "";
     782           2 :                         description = "SYNC";
     783           2 :                         status = PGRES_PIPELINE_SYNC;
     784           2 :                         recv_step++;
     785           2 :                         break;
     786           0 :                     case BI_DONE:
     787             :                         /* unreachable */
     788           0 :                         pg_fatal("unreachable state");
     789             :                 }
     790             : 
     791        1412 :                 if (PQresultStatus(res) != status)
     792           0 :                     pg_fatal("%s reported status %s, expected %s\n"
     793             :                              "Error message: \"%s\"",
     794             :                              description, PQresStatus(PQresultStatus(res)),
     795             :                              PQresStatus(status), PQerrorMessage(conn));
     796             : 
     797        1412 :                 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
     798           0 :                     pg_fatal("%s expected command tag '%s', got '%s'",
     799             :                              description, cmdtag, PQcmdStatus(res));
     800             : 
     801             :                 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
     802             : 
     803        1412 :                 PQclear(res);
     804             :             }
     805             :         }
     806             : 
     807             :         /* Write more rows and/or the end pipeline message, if needed */
     808       36256 :         if (FD_ISSET(sock, &output_mask))
     809             :         {
     810       36254 :             PQflush(conn);
     811             : 
     812       36254 :             if (send_step == BI_INSERT_ROWS)
     813             :             {
     814        1400 :                 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
     815             :                 /* use up some buffer space with a wide value */
     816        1400 :                 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
     817             : 
     818        1400 :                 if (PQsendQueryPrepared(conn, "my_insert",
     819             :                                         2, insert_params, NULL, NULL, 0) == 1)
     820             :                 {
     821             :                     pg_debug("sent row %d\n", rows_to_send);
     822             : 
     823        1400 :                     rows_to_send--;
     824        1400 :                     if (rows_to_send == 0)
     825           2 :                         send_step++;
     826             :                 }
     827             :                 else
     828             :                 {
     829             :                     /*
     830             :                      * in nonblocking mode, so it's OK for an insert to fail
     831             :                      * to send
     832             :                      */
     833           0 :                     fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
     834             :                             rows_to_send, PQerrorMessage(conn));
     835             :                 }
     836             :             }
     837       34854 :             else if (send_step == BI_COMMIT_TX)
     838             :             {
     839           2 :                 if (PQsendQueryParams(conn, "COMMIT",
     840             :                                       0, NULL, NULL, NULL, NULL, 0) == 1)
     841             :                 {
     842             :                     pg_debug("sent COMMIT\n");
     843           2 :                     send_step++;
     844             :                 }
     845             :                 else
     846             :                 {
     847           0 :                     fprintf(stderr, "WARNING: failed to send commit: %s\n",
     848             :                             PQerrorMessage(conn));
     849             :                 }
     850             :             }
     851       34852 :             else if (send_step == BI_SYNC)
     852             :             {
     853           2 :                 if (PQpipelineSync(conn) == 1)
     854             :                 {
     855           2 :                     fprintf(stdout, "pipeline sync sent\n");
     856           2 :                     send_step++;
     857             :                 }
     858             :                 else
     859             :                 {
     860           0 :                     fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
     861             :                             PQerrorMessage(conn));
     862             :                 }
     863             :             }
     864             :         }
     865             :     }
     866             : 
     867             :     /* We've got the sync message and the pipeline should be done */
     868           2 :     if (PQexitPipelineMode(conn) != 1)
     869           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
     870             :                  PQerrorMessage(conn));
     871             : 
     872           2 :     if (PQsetnonblocking(conn, 0) != 0)
     873           0 :         pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
     874             : 
     875           2 :     fprintf(stderr, "ok\n");
     876           2 : }
     877             : 
     878             : static void
     879           2 : test_prepared(PGconn *conn)
     880             : {
     881           2 :     PGresult   *res = NULL;
     882           2 :     Oid         param_oids[1] = {INT4OID};
     883             :     Oid         expected_oids[4];
     884             :     Oid         typ;
     885             : 
     886           2 :     fprintf(stderr, "prepared... ");
     887             : 
     888           2 :     if (PQenterPipelineMode(conn) != 1)
     889           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     890           2 :     if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
     891             :                       "interval '1 sec'",
     892             :                       1, param_oids) != 1)
     893           0 :         pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
     894           2 :     expected_oids[0] = INT4OID;
     895           2 :     expected_oids[1] = TEXTOID;
     896           2 :     expected_oids[2] = NUMERICOID;
     897           2 :     expected_oids[3] = INTERVALOID;
     898           2 :     if (PQsendDescribePrepared(conn, "select_one") != 1)
     899           0 :         pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
     900           2 :     if (PQpipelineSync(conn) != 1)
     901           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     902             : 
     903           2 :     res = PQgetResult(conn);
     904           2 :     if (res == NULL)
     905           0 :         pg_fatal("PQgetResult returned null");
     906           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     907           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
     908           2 :     PQclear(res);
     909           2 :     res = PQgetResult(conn);
     910           2 :     if (res != NULL)
     911           0 :         pg_fatal("expected NULL result");
     912             : 
     913           2 :     res = PQgetResult(conn);
     914           2 :     if (res == NULL)
     915           0 :         pg_fatal("PQgetResult returned NULL");
     916           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     917           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
     918           2 :     if (PQnfields(res) != lengthof(expected_oids))
     919           0 :         pg_fatal("expected %d columns, got %d",
     920             :                  lengthof(expected_oids), PQnfields(res));
     921          10 :     for (int i = 0; i < PQnfields(res); i++)
     922             :     {
     923           8 :         typ = PQftype(res, i);
     924           8 :         if (typ != expected_oids[i])
     925           0 :             pg_fatal("field %d: expected type %u, got %u",
     926             :                      i, expected_oids[i], typ);
     927             :     }
     928           2 :     PQclear(res);
     929           2 :     res = PQgetResult(conn);
     930           2 :     if (res != NULL)
     931           0 :         pg_fatal("expected NULL result");
     932             : 
     933           2 :     res = PQgetResult(conn);
     934           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     935           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
     936             : 
     937           2 :     if (PQexitPipelineMode(conn) != 1)
     938           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
     939             : 
     940           2 :     PQexec(conn, "BEGIN");
     941           2 :     PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
     942           2 :     PQenterPipelineMode(conn);
     943           2 :     if (PQsendDescribePortal(conn, "cursor_one") != 1)
     944           0 :         pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
     945           2 :     if (PQpipelineSync(conn) != 1)
     946           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
     947           2 :     res = PQgetResult(conn);
     948           2 :     if (res == NULL)
     949           0 :         pg_fatal("expected NULL result");
     950           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     951           0 :         pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
     952             : 
     953           2 :     typ = PQftype(res, 0);
     954           2 :     if (typ != INT4OID)
     955           0 :         pg_fatal("portal: expected type %u, got %u",
     956             :                  INT4OID, typ);
     957           2 :     PQclear(res);
     958           2 :     res = PQgetResult(conn);
     959           2 :     if (res != NULL)
     960           0 :         pg_fatal("expected NULL result");
     961           2 :     res = PQgetResult(conn);
     962           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
     963           0 :         pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
     964             : 
     965           2 :     if (PQexitPipelineMode(conn) != 1)
     966           0 :         pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
     967             : 
     968           2 :     fprintf(stderr, "ok\n");
     969           2 : }
     970             : 
     971             : static void
     972           2 : test_simple_pipeline(PGconn *conn)
     973             : {
     974           2 :     PGresult   *res = NULL;
     975           2 :     const char *dummy_params[1] = {"1"};
     976           2 :     Oid         dummy_param_oids[1] = {INT4OID};
     977             : 
     978           2 :     fprintf(stderr, "simple pipeline... ");
     979             : 
     980             :     /*
     981             :      * Enter pipeline mode and dispatch a set of operations, which we'll then
     982             :      * process the results of as they come in.
     983             :      *
     984             :      * For a simple case we should be able to do this without interim
     985             :      * processing of results since our output buffer will give us enough slush
     986             :      * to work with and we won't block on sending. So blocking mode is fine.
     987             :      */
     988           2 :     if (PQisnonblocking(conn))
     989           0 :         pg_fatal("Expected blocking connection mode");
     990             : 
     991           2 :     if (PQenterPipelineMode(conn) != 1)
     992           0 :         pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
     993             : 
     994           2 :     if (PQsendQueryParams(conn, "SELECT $1",
     995             :                           1, dummy_param_oids, dummy_params,
     996             :                           NULL, NULL, 0) != 1)
     997           0 :         pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
     998             : 
     999           2 :     if (PQexitPipelineMode(conn) != 0)
    1000           0 :         pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
    1001             : 
    1002           2 :     if (PQpipelineSync(conn) != 1)
    1003           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1004             : 
    1005           2 :     res = PQgetResult(conn);
    1006           2 :     if (res == NULL)
    1007           0 :         pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
    1008             :                  PQerrorMessage(conn));
    1009             : 
    1010           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1011           0 :         pg_fatal("Unexpected result code %s from first pipeline item",
    1012             :                  PQresStatus(PQresultStatus(res)));
    1013             : 
    1014           2 :     PQclear(res);
    1015           2 :     res = NULL;
    1016             : 
    1017           2 :     if (PQgetResult(conn) != NULL)
    1018           0 :         pg_fatal("PQgetResult returned something extra after first query result.");
    1019             : 
    1020             :     /*
    1021             :      * Even though we've processed the result there's still a sync to come and
    1022             :      * we can't exit pipeline mode yet
    1023             :      */
    1024           2 :     if (PQexitPipelineMode(conn) != 0)
    1025           0 :         pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
    1026             : 
    1027           2 :     res = PQgetResult(conn);
    1028           2 :     if (res == NULL)
    1029           0 :         pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
    1030             :                  PQerrorMessage(conn));
    1031             : 
    1032           2 :     if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
    1033           0 :         pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
    1034             :                  PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
    1035             : 
    1036           2 :     PQclear(res);
    1037           2 :     res = NULL;
    1038             : 
    1039           2 :     if (PQgetResult(conn) != NULL)
    1040           0 :         pg_fatal("PQgetResult returned something extra after pipeline end: %s",
    1041             :                  PQresStatus(PQresultStatus(res)));
    1042             : 
    1043             :     /* We're still in pipeline mode... */
    1044           2 :     if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
    1045           0 :         pg_fatal("Fell out of pipeline mode somehow");
    1046             : 
    1047             :     /* ... until we end it, which we can safely do now */
    1048           2 :     if (PQexitPipelineMode(conn) != 1)
    1049           0 :         pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
    1050             :                  PQerrorMessage(conn));
    1051             : 
    1052           2 :     if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
    1053           0 :         pg_fatal("Exiting pipeline mode didn't seem to work");
    1054             : 
    1055           2 :     fprintf(stderr, "ok\n");
    1056           2 : }
    1057             : 
    1058             : static void
    1059           2 : test_singlerowmode(PGconn *conn)
    1060             : {
    1061             :     PGresult   *res;
    1062             :     int         i;
    1063           2 :     bool        pipeline_ended = false;
    1064             : 
    1065             :     /* 1 pipeline, 3 queries in it */
    1066           2 :     if (PQenterPipelineMode(conn) != 1)
    1067           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1068             :                  PQerrorMessage(conn));
    1069             : 
    1070           8 :     for (i = 0; i < 3; i++)
    1071             :     {
    1072             :         char       *param[1];
    1073             : 
    1074           6 :         param[0] = psprintf("%d", 44 + i);
    1075             : 
    1076           6 :         if (PQsendQueryParams(conn,
    1077             :                               "SELECT generate_series(42, $1)",
    1078             :                               1,
    1079             :                               NULL,
    1080             :                               (const char **) param,
    1081             :                               NULL,
    1082             :                               NULL,
    1083             :                               0) != 1)
    1084           0 :             pg_fatal("failed to send query: %s",
    1085             :                      PQerrorMessage(conn));
    1086           6 :         pfree(param[0]);
    1087             :     }
    1088           2 :     if (PQpipelineSync(conn) != 1)
    1089           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1090             : 
    1091          10 :     for (i = 0; !pipeline_ended; i++)
    1092             :     {
    1093           8 :         bool        first = true;
    1094             :         bool        saw_ending_tuplesok;
    1095           8 :         bool        isSingleTuple = false;
    1096             : 
    1097             :         /* Set single row mode for only first 2 SELECT queries */
    1098           8 :         if (i < 2)
    1099             :         {
    1100           4 :             if (PQsetSingleRowMode(conn) != 1)
    1101           0 :                 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
    1102             :         }
    1103             : 
    1104             :         /* Consume rows for this query */
    1105           8 :         saw_ending_tuplesok = false;
    1106          28 :         while ((res = PQgetResult(conn)) != NULL)
    1107             :         {
    1108          22 :             ExecStatusType est = PQresultStatus(res);
    1109             : 
    1110          22 :             if (est == PGRES_PIPELINE_SYNC)
    1111             :             {
    1112           2 :                 fprintf(stderr, "end of pipeline reached\n");
    1113           2 :                 pipeline_ended = true;
    1114           2 :                 PQclear(res);
    1115           2 :                 if (i != 3)
    1116           0 :                     pg_fatal("Expected three results, got %d", i);
    1117           2 :                 break;
    1118             :             }
    1119             : 
    1120             :             /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
    1121          20 :             if (first)
    1122             :             {
    1123           6 :                 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
    1124           0 :                     pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
    1125             :                              i, PQresStatus(est));
    1126           6 :                 if (i >= 2 && est != PGRES_TUPLES_OK)
    1127           0 :                     pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
    1128             :                              i, PQresStatus(est));
    1129           6 :                 first = false;
    1130             :             }
    1131             : 
    1132          20 :             fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
    1133          20 :             switch (est)
    1134             :             {
    1135           6 :                 case PGRES_TUPLES_OK:
    1136           6 :                     fprintf(stderr, ", tuples: %d\n", PQntuples(res));
    1137           6 :                     saw_ending_tuplesok = true;
    1138           6 :                     if (isSingleTuple)
    1139             :                     {
    1140           4 :                         if (PQntuples(res) == 0)
    1141           4 :                             fprintf(stderr, "all tuples received in query %d\n", i);
    1142             :                         else
    1143           0 :                             pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
    1144             :                     }
    1145           6 :                     break;
    1146             : 
    1147          14 :                 case PGRES_SINGLE_TUPLE:
    1148          14 :                     isSingleTuple = true;
    1149          14 :                     fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
    1150          14 :                     break;
    1151             : 
    1152           0 :                 default:
    1153           0 :                     pg_fatal("unexpected");
    1154             :             }
    1155          20 :             PQclear(res);
    1156             :         }
    1157           8 :         if (!pipeline_ended && !saw_ending_tuplesok)
    1158           0 :             pg_fatal("didn't get expected terminating TUPLES_OK");
    1159             :     }
    1160             : 
    1161           2 :     if (PQexitPipelineMode(conn) != 1)
    1162           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1163           2 : }
    1164             : 
    1165             : /*
    1166             :  * Simple test to verify that a pipeline is discarded as a whole when there's
    1167             :  * an error, ignoring transaction commands.
    1168             :  */
    1169             : static void
    1170           2 : test_transaction(PGconn *conn)
    1171             : {
    1172             :     PGresult   *res;
    1173             :     bool        expect_null;
    1174           2 :     int         num_syncs = 0;
    1175             : 
    1176           2 :     res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
    1177             :                  "CREATE TABLE pq_pipeline_tst (id int)");
    1178           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1179           0 :         pg_fatal("failed to create test table: %s",
    1180             :                  PQerrorMessage(conn));
    1181           2 :     PQclear(res);
    1182             : 
    1183           2 :     if (PQenterPipelineMode(conn) != 1)
    1184           0 :         pg_fatal("failed to enter pipeline mode: %s",
    1185             :                  PQerrorMessage(conn));
    1186           2 :     if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
    1187           0 :         pg_fatal("could not send prepare on pipeline: %s",
    1188             :                  PQerrorMessage(conn));
    1189             : 
    1190           2 :     if (PQsendQueryParams(conn,
    1191             :                           "BEGIN",
    1192             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1193           0 :         pg_fatal("failed to send query: %s",
    1194             :                  PQerrorMessage(conn));
    1195           2 :     if (PQsendQueryParams(conn,
    1196             :                           "SELECT 0/0",
    1197             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1198           0 :         pg_fatal("failed to send query: %s",
    1199             :                  PQerrorMessage(conn));
    1200             : 
    1201             :     /*
    1202             :      * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
    1203             :      * get out of the pipeline-aborted state first.
    1204             :      */
    1205           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1206           0 :         pg_fatal("failed to execute prepared: %s",
    1207             :                  PQerrorMessage(conn));
    1208             : 
    1209             :     /* This insert fails because we're in pipeline-aborted state */
    1210           2 :     if (PQsendQueryParams(conn,
    1211             :                           "INSERT INTO pq_pipeline_tst VALUES (1)",
    1212             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1213           0 :         pg_fatal("failed to send query: %s",
    1214             :                  PQerrorMessage(conn));
    1215           2 :     if (PQpipelineSync(conn) != 1)
    1216           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1217           2 :     num_syncs++;
    1218             : 
    1219             :     /*
    1220             :      * This insert fails even though the pipeline got a SYNC, because we're in
    1221             :      * an aborted transaction
    1222             :      */
    1223           2 :     if (PQsendQueryParams(conn,
    1224             :                           "INSERT INTO pq_pipeline_tst VALUES (2)",
    1225             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1226           0 :         pg_fatal("failed to send query: %s",
    1227             :                  PQerrorMessage(conn));
    1228           2 :     if (PQpipelineSync(conn) != 1)
    1229           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1230           2 :     num_syncs++;
    1231             : 
    1232             :     /*
    1233             :      * Send ROLLBACK using prepared stmt. This one works because we just did
    1234             :      * PQpipelineSync above.
    1235             :      */
    1236           2 :     if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
    1237           0 :         pg_fatal("failed to execute prepared: %s",
    1238             :                  PQerrorMessage(conn));
    1239             : 
    1240             :     /*
    1241             :      * Now that we're out of a transaction and in pipeline-good mode, this
    1242             :      * insert works
    1243             :      */
    1244           2 :     if (PQsendQueryParams(conn,
    1245             :                           "INSERT INTO pq_pipeline_tst VALUES (3)",
    1246             :                           0, NULL, NULL, NULL, NULL, 0) != 1)
    1247           0 :         pg_fatal("failed to send query: %s",
    1248             :                  PQerrorMessage(conn));
    1249             :     /* Send two syncs now -- match up to SYNC messages below */
    1250           2 :     if (PQpipelineSync(conn) != 1)
    1251           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1252           2 :     num_syncs++;
    1253           2 :     if (PQpipelineSync(conn) != 1)
    1254           0 :         pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
    1255           2 :     num_syncs++;
    1256             : 
    1257           2 :     expect_null = false;
    1258           2 :     for (int i = 0;; i++)
    1259          38 :     {
    1260             :         ExecStatusType restype;
    1261             : 
    1262          40 :         res = PQgetResult(conn);
    1263          40 :         if (res == NULL)
    1264             :         {
    1265          16 :             printf("%d: got NULL result\n", i);
    1266          16 :             if (!expect_null)
    1267           0 :                 pg_fatal("did not expect NULL here");
    1268          16 :             expect_null = false;
    1269          16 :             continue;
    1270             :         }
    1271          24 :         restype = PQresultStatus(res);
    1272          24 :         printf("%d: got status %s", i, PQresStatus(restype));
    1273          24 :         if (expect_null)
    1274           0 :             pg_fatal("expected NULL");
    1275          24 :         if (restype == PGRES_FATAL_ERROR)
    1276           4 :             printf("; error: %s", PQerrorMessage(conn));
    1277          20 :         else if (restype == PGRES_PIPELINE_ABORTED)
    1278             :         {
    1279           4 :             printf(": command didn't run because pipeline aborted\n");
    1280             :         }
    1281             :         else
    1282          16 :             printf("\n");
    1283          24 :         PQclear(res);
    1284             : 
    1285          24 :         if (restype == PGRES_PIPELINE_SYNC)
    1286           8 :             num_syncs--;
    1287             :         else
    1288          16 :             expect_null = true;
    1289          24 :         if (num_syncs <= 0)
    1290           2 :             break;
    1291             :     }
    1292           2 :     if (PQgetResult(conn) != NULL)
    1293           0 :         pg_fatal("returned something extra after all the syncs: %s",
    1294             :                  PQresStatus(PQresultStatus(res)));
    1295             : 
    1296           2 :     if (PQexitPipelineMode(conn) != 1)
    1297           0 :         pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
    1298             : 
    1299             :     /* We expect to find one tuple containing the value "3" */
    1300           2 :     res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
    1301           2 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
    1302           0 :         pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
    1303           2 :     if (PQntuples(res) != 1)
    1304           0 :         pg_fatal("did not get 1 tuple");
    1305           2 :     if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
    1306           0 :         pg_fatal("did not get expected tuple");
    1307           2 :     PQclear(res);
    1308             : 
    1309           2 :     fprintf(stderr, "ok\n");
    1310           2 : }
    1311             : 
    1312             : /*
    1313             :  * In this test mode we send a stream of queries, with one in the middle
    1314             :  * causing an error.  Verify that we can still send some more after the
    1315             :  * error and have libpq work properly.
    1316             :  */
    1317             : static void
    1318           2 : test_uniqviol(PGconn *conn)
    1319             : {
    1320           2 :     int         sock = PQsocket(conn);
    1321             :     PGresult   *res;
    1322           2 :     Oid         paramTypes[2] = {INT8OID, INT8OID};
    1323             :     const char *paramValues[2];
    1324             :     char        paramValue0[MAXINT8LEN];
    1325             :     char        paramValue1[MAXINT8LEN];
    1326           2 :     int         ctr = 0;
    1327           2 :     int         numsent = 0;
    1328           2 :     int         results = 0;
    1329           2 :     bool        read_done = false;
    1330           2 :     bool        write_done = false;
    1331           2 :     bool        error_sent = false;
    1332           2 :     bool        got_error = false;
    1333           2 :     int         switched = 0;
    1334           2 :     int         socketful = 0;
    1335             :     fd_set      in_fds;
    1336             :     fd_set      out_fds;
    1337             : 
    1338           2 :     fprintf(stderr, "uniqviol ...");
    1339             : 
    1340           2 :     PQsetnonblocking(conn, 1);
    1341             : 
    1342           2 :     paramValues[0] = paramValue0;
    1343           2 :     paramValues[1] = paramValue1;
    1344           2 :     sprintf(paramValue1, "42");
    1345             : 
    1346           2 :     res = PQexec(conn, "drop table if exists ppln_uniqviol;"
    1347             :                  "create table ppln_uniqviol(id bigint primary key, idata bigint)");
    1348           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1349           0 :         pg_fatal("failed to create table: %s", PQerrorMessage(conn));
    1350             : 
    1351           2 :     res = PQexec(conn, "begin");
    1352           2 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1353           0 :         pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
    1354             : 
    1355           2 :     res = PQprepare(conn, "insertion",
    1356             :                     "insert into ppln_uniqviol values ($1, $2) returning id",
    1357             :                     2, paramTypes);
    1358           2 :     if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
    1359           0 :         pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
    1360             : 
    1361           2 :     if (PQenterPipelineMode(conn) != 1)
    1362           0 :         pg_fatal("failed to enter pipeline mode");
    1363             : 
    1364          16 :     while (!read_done)
    1365             :     {
    1366             :         /*
    1367             :          * Avoid deadlocks by reading everything the server has sent before
    1368             :          * sending anything.  (Special precaution is needed here to process
    1369             :          * PQisBusy before testing the socket for read-readiness, because the
    1370             :          * socket does not turn read-ready after "sending" queries in aborted
    1371             :          * pipeline mode.)
    1372             :          */
    1373        1212 :         while (PQisBusy(conn) == 0)
    1374             :         {
    1375             :             bool        new_error;
    1376             : 
    1377        1202 :             if (results >= numsent)
    1378             :             {
    1379           2 :                 if (write_done)
    1380           0 :                     read_done = true;
    1381           2 :                 break;
    1382             :             }
    1383             : 
    1384        1200 :             res = PQgetResult(conn);
    1385        1200 :             new_error = process_result(conn, res, results, numsent);
    1386        1200 :             if (new_error && got_error)
    1387           0 :                 pg_fatal("got two errors");
    1388        1200 :             got_error |= new_error;
    1389        1200 :             if (results++ >= numsent - 1)
    1390             :             {
    1391           4 :                 if (write_done)
    1392           2 :                     read_done = true;
    1393           4 :                 break;
    1394             :             }
    1395             :         }
    1396             : 
    1397          16 :         if (read_done)
    1398           2 :             break;
    1399             : 
    1400          14 :         FD_ZERO(&out_fds);
    1401          14 :         FD_SET(sock, &out_fds);
    1402             : 
    1403          14 :         FD_ZERO(&in_fds);
    1404          14 :         FD_SET(sock, &in_fds);
    1405             : 
    1406          14 :         if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
    1407             :         {
    1408           0 :             if (errno == EINTR)
    1409           0 :                 continue;
    1410           0 :             pg_fatal("select() failed: %m");
    1411             :         }
    1412             : 
    1413          14 :         if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
    1414           0 :             pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
    1415             : 
    1416             :         /*
    1417             :          * If the socket is writable and we haven't finished sending queries,
    1418             :          * send some.
    1419             :          */
    1420          14 :         if (!write_done && FD_ISSET(sock, &out_fds))
    1421             :         {
    1422             :             for (;;)
    1423        1194 :             {
    1424             :                 int         flush;
    1425             : 
    1426             :                 /*
    1427             :                  * provoke uniqueness violation exactly once after having
    1428             :                  * switched to read mode.
    1429             :                  */
    1430        1200 :                 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
    1431             :                 {
    1432           2 :                     sprintf(paramValue0, "%d", numsent / 2);
    1433           2 :                     fprintf(stderr, "E");
    1434           2 :                     error_sent = true;
    1435             :                 }
    1436             :                 else
    1437             :                 {
    1438        1198 :                     fprintf(stderr, ".");
    1439        1198 :                     sprintf(paramValue0, "%d", ctr++);
    1440             :                 }
    1441             : 
    1442        1200 :                 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
    1443           0 :                     pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
    1444        1200 :                 numsent++;
    1445             : 
    1446             :                 /* Are we done writing? */
    1447        1200 :                 if (socketful != 0 && numsent % socketful == 42 && error_sent)
    1448             :                 {
    1449           2 :                     if (PQsendFlushRequest(conn) != 1)
    1450           0 :                         pg_fatal("failed to send flush request");
    1451           2 :                     write_done = true;
    1452           2 :                     fprintf(stderr, "\ndone writing\n");
    1453           2 :                     PQflush(conn);
    1454           2 :                     break;
    1455             :                 }
    1456             : 
    1457             :                 /* is the outgoing socket full? */
    1458        1198 :                 flush = PQflush(conn);
    1459        1198 :                 if (flush == -1)
    1460           0 :                     pg_fatal("failed to flush: %s", PQerrorMessage(conn));
    1461        1198 :                 if (flush == 1)
    1462             :                 {
    1463           4 :                     if (socketful == 0)
    1464           2 :                         socketful = numsent;
    1465           4 :                     fprintf(stderr, "\nswitch to reading\n");
    1466           4 :                     switched++;
    1467           4 :                     break;
    1468             :                 }
    1469             :             }
    1470             :         }
    1471             :     }
    1472             : 
    1473           2 :     if (!got_error)
    1474           0 :         pg_fatal("did not get expected error");
    1475             : 
    1476           2 :     fprintf(stderr, "ok\n");
    1477           2 : }
    1478             : 
    1479             : /*
    1480             :  * Subroutine for test_uniqviol; given a PGresult, print it out and consume
    1481             :  * the expected NULL that should follow it.
    1482             :  *
    1483             :  * Returns true if we read a fatal error message, otherwise false.
    1484             :  */
    1485             : static bool
    1486        1200 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
    1487             : {
    1488             :     PGresult   *res2;
    1489        1200 :     bool        got_error = false;
    1490             : 
    1491        1200 :     if (res == NULL)
    1492           0 :         pg_fatal("got unexpected NULL");
    1493             : 
    1494        1200 :     switch (PQresultStatus(res))
    1495             :     {
    1496           2 :         case PGRES_FATAL_ERROR:
    1497           2 :             got_error = true;
    1498           2 :             fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
    1499           2 :             PQclear(res);
    1500             : 
    1501           2 :             res2 = PQgetResult(conn);
    1502           2 :             if (res2 != NULL)
    1503           0 :                 pg_fatal("expected NULL, got %s",
    1504             :                          PQresStatus(PQresultStatus(res2)));
    1505           2 :             break;
    1506             : 
    1507         836 :         case PGRES_TUPLES_OK:
    1508         836 :             fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
    1509         836 :             PQclear(res);
    1510             : 
    1511         836 :             res2 = PQgetResult(conn);
    1512         836 :             if (res2 != NULL)
    1513           0 :                 pg_fatal("expected NULL, got %s",
    1514             :                          PQresStatus(PQresultStatus(res2)));
    1515         836 :             break;
    1516             : 
    1517         362 :         case PGRES_PIPELINE_ABORTED:
    1518         362 :             fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
    1519         362 :             res2 = PQgetResult(conn);
    1520         362 :             if (res2 != NULL)
    1521           0 :                 pg_fatal("expected NULL, got %s",
    1522             :                          PQresStatus(PQresultStatus(res2)));
    1523         362 :             break;
    1524             : 
    1525           0 :         default:
    1526           0 :             pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
    1527             :     }
    1528             : 
    1529        1200 :     return got_error;
    1530             : }
    1531             : 
    1532             : 
    1533             : static void
    1534           0 : usage(const char *progname)
    1535             : {
    1536           0 :     fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
    1537           0 :     fprintf(stderr, "Usage:\n");
    1538           0 :     fprintf(stderr, "  %s [OPTION] tests\n", progname);
    1539           0 :     fprintf(stderr, "  %s [OPTION] TESTNAME [CONNINFO]\n", progname);
    1540           0 :     fprintf(stderr, "\nOptions:\n");
    1541           0 :     fprintf(stderr, "  -t TRACEFILE       generate a libpq trace to TRACEFILE\n");
    1542           0 :     fprintf(stderr, "  -r NUMROWS         use NUMROWS as the test size\n");
    1543           0 : }
    1544             : 
    1545             : static void
    1546           2 : print_test_list(void)
    1547             : {
    1548           2 :     printf("disallowed_in_pipeline\n");
    1549           2 :     printf("multi_pipelines\n");
    1550           2 :     printf("nosync\n");
    1551           2 :     printf("pipeline_abort\n");
    1552           2 :     printf("pipelined_insert\n");
    1553           2 :     printf("prepared\n");
    1554           2 :     printf("simple_pipeline\n");
    1555           2 :     printf("singlerow\n");
    1556           2 :     printf("transaction\n");
    1557           2 :     printf("uniqviol\n");
    1558           2 : }
    1559             : 
    1560             : int
    1561          22 : main(int argc, char **argv)
    1562             : {
    1563          22 :     const char *conninfo = "";
    1564             :     PGconn     *conn;
    1565             :     FILE       *trace;
    1566             :     char       *testname;
    1567          22 :     int         numrows = 10000;
    1568             :     PGresult   *res;
    1569             :     int         c;
    1570             : 
    1571          80 :     while ((c = getopt(argc, argv, "t:r:")) != -1)
    1572             :     {
    1573          36 :         switch (c)
    1574             :         {
    1575          16 :             case 't':           /* trace file */
    1576          16 :                 tracefile = pg_strdup(optarg);
    1577          16 :                 break;
    1578          20 :             case 'r':           /* numrows */
    1579          20 :                 errno = 0;
    1580          20 :                 numrows = strtol(optarg, NULL, 10);
    1581          20 :                 if (errno != 0 || numrows <= 0)
    1582             :                 {
    1583           0 :                     fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
    1584             :                             optarg);
    1585           0 :                     exit(1);
    1586             :                 }
    1587          20 :                 break;
    1588             :         }
    1589          58 :     }
    1590             : 
    1591          22 :     if (optind < argc)
    1592             :     {
    1593          22 :         testname = pg_strdup(argv[optind]);
    1594          22 :         optind++;
    1595             :     }
    1596             :     else
    1597             :     {
    1598           0 :         usage(argv[0]);
    1599           0 :         exit(1);
    1600             :     }
    1601             : 
    1602          22 :     if (strcmp(testname, "tests") == 0)
    1603             :     {
    1604           2 :         print_test_list();
    1605           2 :         exit(0);
    1606             :     }
    1607             : 
    1608          20 :     if (optind < argc)
    1609             :     {
    1610          20 :         conninfo = pg_strdup(argv[optind]);
    1611          20 :         optind++;
    1612             :     }
    1613             : 
    1614             :     /* Make a connection to the database */
    1615          20 :     conn = PQconnectdb(conninfo);
    1616          20 :     if (PQstatus(conn) != CONNECTION_OK)
    1617             :     {
    1618           0 :         fprintf(stderr, "Connection to database failed: %s\n",
    1619             :                 PQerrorMessage(conn));
    1620           0 :         exit_nicely(conn);
    1621             :     }
    1622             : 
    1623          20 :     res = PQexec(conn, "SET lc_messages TO \"C\"");
    1624          20 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1625           0 :         pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
    1626          20 :     res = PQexec(conn, "SET force_parallel_mode = off");
    1627          20 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1628           0 :         pg_fatal("failed to set force_parallel_mode: %s", PQerrorMessage(conn));
    1629             : 
    1630             :     /* Set the trace file, if requested */
    1631          20 :     if (tracefile != NULL)
    1632             :     {
    1633          16 :         trace = fopen(tracefile, "w");
    1634          16 :         if (trace == NULL)
    1635           0 :             pg_fatal("could not open file \"%s\": %m", tracefile);
    1636             : 
    1637             :         /* Make it line-buffered */
    1638          16 :         setvbuf(trace, NULL, PG_IOLBF, 0);
    1639             : 
    1640          16 :         PQtrace(conn, trace);
    1641          16 :         PQsetTraceFlags(conn,
    1642             :                         PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
    1643             :     }
    1644             : 
    1645          20 :     if (strcmp(testname, "disallowed_in_pipeline") == 0)
    1646           2 :         test_disallowed_in_pipeline(conn);
    1647          18 :     else if (strcmp(testname, "multi_pipelines") == 0)
    1648           2 :         test_multi_pipelines(conn);
    1649          16 :     else if (strcmp(testname, "nosync") == 0)
    1650           2 :         test_nosync(conn);
    1651          14 :     else if (strcmp(testname, "pipeline_abort") == 0)
    1652           2 :         test_pipeline_abort(conn);
    1653          12 :     else if (strcmp(testname, "pipelined_insert") == 0)
    1654           2 :         test_pipelined_insert(conn, numrows);
    1655          10 :     else if (strcmp(testname, "prepared") == 0)
    1656           2 :         test_prepared(conn);
    1657           8 :     else if (strcmp(testname, "simple_pipeline") == 0)
    1658           2 :         test_simple_pipeline(conn);
    1659           6 :     else if (strcmp(testname, "singlerow") == 0)
    1660           2 :         test_singlerowmode(conn);
    1661           4 :     else if (strcmp(testname, "transaction") == 0)
    1662           2 :         test_transaction(conn);
    1663           2 :     else if (strcmp(testname, "uniqviol") == 0)
    1664           2 :         test_uniqviol(conn);
    1665             :     else
    1666             :     {
    1667           0 :         fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
    1668           0 :         exit(1);
    1669             :     }
    1670             : 
    1671             :     /* close the connection to the database and cleanup */
    1672          20 :     PQfinish(conn);
    1673          20 :     return 0;
    1674             : }

Generated by: LCOV version 1.14