Line data Source code
1 : /*
2 : * src/test/isolation/isolationtester.c
3 : *
4 : * isolationtester.c
5 : * Runs an isolation test specified by a spec file.
6 : */
7 :
8 : #include "postgres_fe.h"
9 :
10 : #include <sys/select.h>
11 : #include <sys/time.h>
12 :
13 : #include "datatype/timestamp.h"
14 : #include "isolationtester.h"
15 : #include "libpq-fe.h"
16 : #include "pg_getopt.h"
17 : #include "pqexpbuffer.h"
18 :
19 : #define PREP_WAITING "isolationtester_waiting"
20 :
21 : /*
22 : * conns[0] is the global setup, teardown, and watchdog connection. Additional
23 : * connections represent spec-defined sessions.
24 : */
25 : typedef struct IsoConnInfo
26 : {
27 : /* The libpq connection object for this connection. */
28 : PGconn *conn;
29 : /* The backend PID, in numeric and string formats. */
30 : int backend_pid;
31 : const char *backend_pid_str;
32 : /* Name of the associated session. */
33 : const char *sessionname;
34 : /* Active step on this connection, or NULL if idle. */
35 : PermutationStep *active_step;
36 : /* Number of NOTICE messages received from connection. */
37 : int total_notices;
38 : } IsoConnInfo;
39 :
40 : static IsoConnInfo *conns = NULL;
41 : static int nconns = 0;
42 :
43 : /* Flag indicating some new NOTICE has arrived */
44 : static bool any_new_notice = false;
45 :
46 : /* Maximum time to wait before giving up on a step (in usec) */
47 : static int64 max_step_wait = 360 * USECS_PER_SEC;
48 :
49 :
50 : static void check_testspec(TestSpec *testspec);
51 : static void run_testspec(TestSpec *testspec);
52 : static void run_all_permutations(TestSpec *testspec);
53 : static void run_all_permutations_recurse(TestSpec *testspec, int *piles,
54 : int nsteps, PermutationStep **steps);
55 : static void run_named_permutations(TestSpec *testspec);
56 : static void run_permutation(TestSpec *testspec, int nsteps,
57 : PermutationStep **steps);
58 :
59 : /* Flag bits for try_complete_step(s) */
60 : #define STEP_NONBLOCK 0x1 /* return as soon as cmd waits for a lock */
61 : #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
62 :
63 : static int try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
64 : int nwaiting, int flags);
65 : static bool try_complete_step(TestSpec *testspec, PermutationStep *pstep,
66 : int flags);
67 :
68 : static int step_qsort_cmp(const void *a, const void *b);
69 : static int step_bsearch_cmp(const void *a, const void *b);
70 :
71 : static bool step_has_blocker(PermutationStep *pstep);
72 : static void printResultSet(PGresult *res);
73 : static void isotesterNoticeProcessor(void *arg, const char *message);
74 : static void blackholeNoticeProcessor(void *arg, const char *message);
75 :
76 : static void
77 274 : disconnect_atexit(void)
78 : {
79 : int i;
80 :
81 1226 : for (i = 0; i < nconns; i++)
82 952 : if (conns[i].conn)
83 952 : PQfinish(conns[i].conn);
84 274 : }
85 :
86 : int
87 290 : main(int argc, char **argv)
88 : {
89 : const char *conninfo;
90 : const char *env_wait;
91 : TestSpec *testspec;
92 : PGresult *res;
93 : PQExpBufferData wait_query;
94 : int opt;
95 : int i;
96 :
97 290 : while ((opt = getopt(argc, argv, "V")) != -1)
98 : {
99 16 : switch (opt)
100 : {
101 16 : case 'V':
102 16 : puts("isolationtester (PostgreSQL) " PG_VERSION);
103 16 : exit(0);
104 0 : default:
105 0 : fprintf(stderr, "Usage: isolationtester [CONNINFO]\n");
106 0 : return EXIT_FAILURE;
107 : }
108 : }
109 :
110 : /*
111 : * Make stdout unbuffered to match stderr; and ensure stderr is unbuffered
112 : * too, which it should already be everywhere except sometimes in Windows.
113 : */
114 274 : setbuf(stdout, NULL);
115 274 : setbuf(stderr, NULL);
116 :
117 : /*
118 : * If the user supplies a non-option parameter on the command line, use it
119 : * as the conninfo string; otherwise default to setting dbname=postgres
120 : * and using environment variables or defaults for all other connection
121 : * parameters.
122 : */
123 274 : if (argc > optind)
124 274 : conninfo = argv[optind];
125 : else
126 0 : conninfo = "dbname = postgres";
127 :
128 : /*
129 : * If PG_TEST_TIMEOUT_DEFAULT is set, adopt its value (given in seconds)
130 : * as half the max time to wait for any one step to complete.
131 : */
132 274 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
133 274 : if (env_wait != NULL)
134 0 : max_step_wait = 2 * ((int64) atoi(env_wait)) * USECS_PER_SEC;
135 :
136 : /* Read the test spec from stdin */
137 274 : spec_yyparse();
138 274 : testspec = &parseresult;
139 :
140 : /* Perform post-parse checking, and fill in linking fields */
141 274 : check_testspec(testspec);
142 :
143 274 : printf("Parsed test spec with %d sessions\n", testspec->nsessions);
144 :
145 : /*
146 : * Establish connections to the database, one for each session and an
147 : * extra for lock wait detection and global work.
148 : */
149 274 : nconns = 1 + testspec->nsessions;
150 274 : conns = (IsoConnInfo *) pg_malloc0(nconns * sizeof(IsoConnInfo));
151 274 : atexit(disconnect_atexit);
152 :
153 1226 : for (i = 0; i < nconns; i++)
154 : {
155 : const char *sessionname;
156 :
157 952 : if (i == 0)
158 274 : sessionname = "control connection";
159 : else
160 678 : sessionname = testspec->sessions[i - 1]->name;
161 :
162 952 : conns[i].sessionname = sessionname;
163 :
164 952 : conns[i].conn = PQconnectdb(conninfo);
165 952 : if (PQstatus(conns[i].conn) != CONNECTION_OK)
166 : {
167 0 : fprintf(stderr, "Connection %d failed: %s",
168 0 : i, PQerrorMessage(conns[i].conn));
169 0 : exit(1);
170 : }
171 :
172 : /*
173 : * Set up notice processors for the user-defined connections, so that
174 : * messages can get printed prefixed with the session names. The
175 : * control connection gets a "blackhole" processor instead (hides all
176 : * messages).
177 : */
178 952 : if (i != 0)
179 678 : PQsetNoticeProcessor(conns[i].conn,
180 : isotesterNoticeProcessor,
181 678 : &conns[i]);
182 : else
183 274 : PQsetNoticeProcessor(conns[i].conn,
184 : blackholeNoticeProcessor,
185 : NULL);
186 :
187 : /*
188 : * Similarly, append the session name to application_name to make it
189 : * easier to map spec file sessions to log output and
190 : * pg_stat_activity. The reason to append instead of just setting the
191 : * name is that we don't know the name of the test currently running.
192 : */
193 952 : res = PQexecParams(conns[i].conn,
194 : "SELECT set_config('application_name',\n"
195 : " current_setting('application_name') || '/' || $1,\n"
196 : " false)",
197 : 1, NULL,
198 : &sessionname,
199 : NULL, NULL, 0);
200 952 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
201 : {
202 0 : fprintf(stderr, "setting of application name failed: %s",
203 0 : PQerrorMessage(conns[i].conn));
204 0 : exit(1);
205 : }
206 :
207 : /* Save each connection's backend PID for subsequent use. */
208 952 : conns[i].backend_pid = PQbackendPID(conns[i].conn);
209 952 : conns[i].backend_pid_str = psprintf("%d", conns[i].backend_pid);
210 : }
211 :
212 : /*
213 : * Build the query we'll use to detect lock contention among sessions in
214 : * the test specification. Most of the time, we could get away with
215 : * simply checking whether a session is waiting for *any* lock: we don't
216 : * exactly expect concurrent use of test tables. However, autovacuum will
217 : * occasionally take AccessExclusiveLock to truncate a table, and we must
218 : * ignore that transient wait.
219 : */
220 274 : initPQExpBuffer(&wait_query);
221 274 : appendPQExpBufferStr(&wait_query,
222 : "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
223 : /* The spec syntax requires at least one session; assume that here. */
224 274 : appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
225 678 : for (i = 2; i < nconns; i++)
226 404 : appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
227 274 : appendPQExpBufferStr(&wait_query, "}')");
228 :
229 274 : res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
230 274 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
231 : {
232 0 : fprintf(stderr, "prepare of lock wait query failed: %s",
233 0 : PQerrorMessage(conns[0].conn));
234 0 : exit(1);
235 : }
236 274 : PQclear(res);
237 274 : termPQExpBuffer(&wait_query);
238 :
239 : /*
240 : * Run the permutations specified in the spec, or all if none were
241 : * explicitly specified.
242 : */
243 274 : run_testspec(testspec);
244 :
245 274 : return 0;
246 : }
247 :
248 : /*
249 : * Validity-check the test spec and fill in cross-links between nodes.
250 : */
251 : static void
252 274 : check_testspec(TestSpec *testspec)
253 : {
254 : int nallsteps;
255 : Step **allsteps;
256 : int i,
257 : j,
258 : k;
259 :
260 : /* Create a sorted lookup table of all steps. */
261 274 : nallsteps = 0;
262 952 : for (i = 0; i < testspec->nsessions; i++)
263 678 : nallsteps += testspec->sessions[i]->nsteps;
264 :
265 274 : allsteps = pg_malloc(nallsteps * sizeof(Step *));
266 :
267 274 : k = 0;
268 952 : for (i = 0; i < testspec->nsessions; i++)
269 : {
270 3466 : for (j = 0; j < testspec->sessions[i]->nsteps; j++)
271 2788 : allsteps[k++] = testspec->sessions[i]->steps[j];
272 : }
273 :
274 274 : qsort(allsteps, nallsteps, sizeof(Step *), step_qsort_cmp);
275 :
276 : /* Verify that all step names are unique. */
277 2788 : for (i = 1; i < nallsteps; i++)
278 : {
279 2514 : if (strcmp(allsteps[i - 1]->name,
280 2514 : allsteps[i]->name) == 0)
281 : {
282 0 : fprintf(stderr, "duplicate step name: %s\n",
283 0 : allsteps[i]->name);
284 0 : exit(1);
285 : }
286 : }
287 :
288 : /* Set the session index fields in steps. */
289 952 : for (i = 0; i < testspec->nsessions; i++)
290 : {
291 678 : Session *session = testspec->sessions[i];
292 :
293 3466 : for (j = 0; j < session->nsteps; j++)
294 2788 : session->steps[j]->session = i;
295 : }
296 :
297 : /*
298 : * If we have manually-specified permutations, link PermutationSteps to
299 : * Steps, and fill in blocker links.
300 : */
301 2664 : for (i = 0; i < testspec->npermutations; i++)
302 : {
303 2390 : Permutation *p = testspec->permutations[i];
304 :
305 20126 : for (j = 0; j < p->nsteps; j++)
306 : {
307 17736 : PermutationStep *pstep = p->steps[j];
308 17736 : Step **this = (Step **) bsearch(pstep->name,
309 : allsteps,
310 : nallsteps,
311 : sizeof(Step *),
312 : step_bsearch_cmp);
313 :
314 17736 : if (this == NULL)
315 : {
316 0 : fprintf(stderr, "undefined step \"%s\" specified in permutation\n",
317 : pstep->name);
318 0 : exit(1);
319 : }
320 17736 : pstep->step = *this;
321 :
322 : /* Mark the step used, for check below */
323 17736 : pstep->step->used = true;
324 : }
325 :
326 : /*
327 : * Identify any blocker steps. We search only the current
328 : * permutation, since steps not used there couldn't be concurrent.
329 : * Note that it's OK to reference later permutation steps, so this
330 : * can't be combined with the previous loop.
331 : */
332 20126 : for (j = 0; j < p->nsteps; j++)
333 : {
334 17736 : PermutationStep *pstep = p->steps[j];
335 :
336 17870 : for (k = 0; k < pstep->nblockers; k++)
337 : {
338 134 : PermutationStepBlocker *blocker = pstep->blockers[k];
339 : int n;
340 :
341 134 : if (blocker->blocktype == PSB_ONCE)
342 24 : continue; /* nothing to link to */
343 :
344 110 : blocker->step = NULL;
345 542 : for (n = 0; n < p->nsteps; n++)
346 : {
347 542 : PermutationStep *otherp = p->steps[n];
348 :
349 542 : if (strcmp(otherp->name, blocker->stepname) == 0)
350 : {
351 110 : blocker->step = otherp->step;
352 110 : break;
353 : }
354 : }
355 110 : if (blocker->step == NULL)
356 : {
357 0 : fprintf(stderr, "undefined blocking step \"%s\" referenced in permutation step \"%s\"\n",
358 : blocker->stepname, pstep->name);
359 0 : exit(1);
360 : }
361 : /* can't block on completion of step of own session */
362 110 : if (blocker->step->session == pstep->step->session)
363 : {
364 0 : fprintf(stderr, "permutation step \"%s\" cannot block on its own session\n",
365 : pstep->name);
366 0 : exit(1);
367 : }
368 : }
369 : }
370 : }
371 :
372 : /*
373 : * If we have manually-specified permutations, verify that all steps have
374 : * been used, warning about anything defined but not used. We can skip
375 : * this when using automatically-generated permutations.
376 : */
377 274 : if (testspec->permutations)
378 : {
379 2882 : for (i = 0; i < nallsteps; i++)
380 : {
381 2636 : if (!allsteps[i]->used)
382 12 : fprintf(stderr, "unused step name: %s\n", allsteps[i]->name);
383 : }
384 : }
385 :
386 274 : free(allsteps);
387 274 : }
388 :
389 : /*
390 : * Run the permutations specified in the spec, or all if none were
391 : * explicitly specified.
392 : */
393 : static void
394 274 : run_testspec(TestSpec *testspec)
395 : {
396 274 : if (testspec->permutations)
397 246 : run_named_permutations(testspec);
398 : else
399 28 : run_all_permutations(testspec);
400 274 : }
401 :
402 : /*
403 : * Run all permutations of the steps and sessions.
404 : */
405 : static void
406 28 : run_all_permutations(TestSpec *testspec)
407 : {
408 : int nsteps;
409 : int i;
410 : PermutationStep *steps;
411 : PermutationStep **stepptrs;
412 : int *piles;
413 :
414 : /* Count the total number of steps in all sessions */
415 28 : nsteps = 0;
416 88 : for (i = 0; i < testspec->nsessions; i++)
417 60 : nsteps += testspec->sessions[i]->nsteps;
418 :
419 : /* Create PermutationStep workspace array */
420 28 : steps = (PermutationStep *) pg_malloc0(sizeof(PermutationStep) * nsteps);
421 28 : stepptrs = (PermutationStep **) pg_malloc(sizeof(PermutationStep *) * nsteps);
422 180 : for (i = 0; i < nsteps; i++)
423 152 : stepptrs[i] = steps + i;
424 :
425 : /*
426 : * To generate the permutations, we conceptually put the steps of each
427 : * session on a pile. To generate a permutation, we pick steps from the
428 : * piles until all piles are empty. By picking steps from piles in
429 : * different order, we get different permutations.
430 : *
431 : * A pile is actually just an integer which tells how many steps we've
432 : * already picked from this pile.
433 : */
434 28 : piles = pg_malloc(sizeof(int) * testspec->nsessions);
435 88 : for (i = 0; i < testspec->nsessions; i++)
436 60 : piles[i] = 0;
437 :
438 28 : run_all_permutations_recurse(testspec, piles, 0, stepptrs);
439 :
440 28 : free(steps);
441 28 : free(stepptrs);
442 28 : free(piles);
443 28 : }
444 :
445 : static void
446 3120 : run_all_permutations_recurse(TestSpec *testspec, int *piles,
447 : int nsteps, PermutationStep **steps)
448 : {
449 : int i;
450 3120 : bool found = false;
451 :
452 11202 : for (i = 0; i < testspec->nsessions; i++)
453 : {
454 : /* If there's any more steps in this pile, pick it and recurse */
455 8082 : if (piles[i] < testspec->sessions[i]->nsteps)
456 : {
457 3092 : Step *newstep = testspec->sessions[i]->steps[piles[i]];
458 :
459 : /*
460 : * These automatically-generated PermutationSteps never have
461 : * blocker conditions. So we need only fill these fields, relying
462 : * on run_all_permutations() to have zeroed the rest:
463 : */
464 3092 : steps[nsteps]->name = newstep->name;
465 3092 : steps[nsteps]->step = newstep;
466 :
467 3092 : piles[i]++;
468 :
469 3092 : run_all_permutations_recurse(testspec, piles, nsteps + 1, steps);
470 :
471 3092 : piles[i]--;
472 :
473 3092 : found = true;
474 : }
475 : }
476 :
477 : /* If all the piles were empty, this permutation is completed. Run it */
478 3120 : if (!found)
479 972 : run_permutation(testspec, nsteps, steps);
480 3120 : }
481 :
482 : /*
483 : * Run permutations given in the test spec
484 : */
485 : static void
486 246 : run_named_permutations(TestSpec *testspec)
487 : {
488 : int i;
489 :
490 2636 : for (i = 0; i < testspec->npermutations; i++)
491 : {
492 2390 : Permutation *p = testspec->permutations[i];
493 :
494 2390 : run_permutation(testspec, p->nsteps, p->steps);
495 : }
496 246 : }
497 :
498 : static int
499 8526 : step_qsort_cmp(const void *a, const void *b)
500 : {
501 8526 : Step *stepa = *((Step **) a);
502 8526 : Step *stepb = *((Step **) b);
503 :
504 8526 : return strcmp(stepa->name, stepb->name);
505 : }
506 :
507 : static int
508 57930 : step_bsearch_cmp(const void *a, const void *b)
509 : {
510 57930 : char *stepname = (char *) a;
511 57930 : Step *step = *((Step **) b);
512 :
513 57930 : return strcmp(stepname, step->name);
514 : }
515 :
516 : /*
517 : * Run one permutation
518 : */
519 : static void
520 3362 : run_permutation(TestSpec *testspec, int nsteps, PermutationStep **steps)
521 : {
522 : PGresult *res;
523 : int i;
524 3362 : int nwaiting = 0;
525 : PermutationStep **waiting;
526 :
527 3362 : waiting = pg_malloc(sizeof(PermutationStep *) * testspec->nsessions);
528 :
529 3362 : printf("\nstarting permutation:");
530 27310 : for (i = 0; i < nsteps; i++)
531 23948 : printf(" %s", steps[i]->name);
532 3362 : printf("\n");
533 :
534 : /* Perform setup */
535 6768 : for (i = 0; i < testspec->nsetupsqls; i++)
536 : {
537 3406 : res = PQexec(conns[0].conn, testspec->setupsqls[i]);
538 3406 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
539 : {
540 126 : printResultSet(res);
541 : }
542 3280 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
543 : {
544 0 : fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0].conn));
545 0 : exit(1);
546 : }
547 3406 : PQclear(res);
548 : }
549 :
550 : /* Perform per-session setup */
551 11416 : for (i = 0; i < testspec->nsessions; i++)
552 : {
553 8054 : if (testspec->sessions[i]->setupsql)
554 : {
555 5158 : res = PQexec(conns[i + 1].conn, testspec->sessions[i]->setupsql);
556 5158 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
557 : {
558 70 : printResultSet(res);
559 : }
560 5088 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
561 : {
562 0 : fprintf(stderr, "setup of session %s failed: %s",
563 0 : conns[i + 1].sessionname,
564 0 : PQerrorMessage(conns[i + 1].conn));
565 0 : exit(1);
566 : }
567 5158 : PQclear(res);
568 : }
569 : }
570 :
571 : /* Perform steps */
572 27310 : for (i = 0; i < nsteps; i++)
573 : {
574 23948 : PermutationStep *pstep = steps[i];
575 23948 : Step *step = pstep->step;
576 23948 : IsoConnInfo *iconn = &conns[1 + step->session];
577 23948 : PGconn *conn = iconn->conn;
578 : bool mustwait;
579 : int j;
580 :
581 : /*
582 : * Check whether the session that needs to perform the next step is
583 : * still blocked on an earlier step. If so, wait for it to finish.
584 : */
585 23948 : if (iconn->active_step != NULL)
586 : {
587 : struct timeval start_time;
588 :
589 68 : gettimeofday(&start_time, NULL);
590 :
591 136 : while (iconn->active_step != NULL)
592 : {
593 68 : PermutationStep *oldstep = iconn->active_step;
594 :
595 : /*
596 : * Wait for oldstep. But even though we don't use
597 : * STEP_NONBLOCK, it might not complete because of blocker
598 : * conditions.
599 : */
600 68 : if (!try_complete_step(testspec, oldstep, STEP_RETRY))
601 : {
602 : /* Done, so remove oldstep from the waiting[] array. */
603 : int w;
604 :
605 96 : for (w = 0; w < nwaiting; w++)
606 : {
607 96 : if (oldstep == waiting[w])
608 68 : break;
609 : }
610 68 : if (w >= nwaiting)
611 0 : abort(); /* can't happen */
612 68 : if (w + 1 < nwaiting)
613 0 : memmove(&waiting[w], &waiting[w + 1],
614 0 : (nwaiting - (w + 1)) * sizeof(PermutationStep *));
615 68 : nwaiting--;
616 : }
617 :
618 : /*
619 : * Check for other steps that have finished. We should do
620 : * this if oldstep completed, as it might have unblocked
621 : * something. On the other hand, if oldstep hasn't completed,
622 : * we must poll all the active steps in hopes of unblocking
623 : * oldstep. So either way, poll them.
624 : */
625 68 : nwaiting = try_complete_steps(testspec, waiting, nwaiting,
626 : STEP_NONBLOCK | STEP_RETRY);
627 :
628 : /*
629 : * If the target session is still busy, apply a timeout to
630 : * keep from hanging indefinitely, which could happen with
631 : * incorrect blocker annotations. Use the same 2 *
632 : * max_step_wait limit as try_complete_step does for deciding
633 : * to die. (We don't bother with trying to cancel anything,
634 : * since it's unclear what to cancel in this case.)
635 : */
636 68 : if (iconn->active_step != NULL)
637 : {
638 : struct timeval current_time;
639 : int64 td;
640 :
641 0 : gettimeofday(¤t_time, NULL);
642 0 : td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
643 0 : td *= USECS_PER_SEC;
644 0 : td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
645 0 : if (td > 2 * max_step_wait)
646 : {
647 0 : fprintf(stderr, "step %s timed out after %d seconds\n",
648 0 : iconn->active_step->name,
649 0 : (int) (td / USECS_PER_SEC));
650 0 : fprintf(stderr, "active steps are:");
651 0 : for (j = 1; j < nconns; j++)
652 : {
653 0 : IsoConnInfo *oconn = &conns[j];
654 :
655 0 : if (oconn->active_step != NULL)
656 0 : fprintf(stderr, " %s",
657 0 : oconn->active_step->name);
658 : }
659 0 : fprintf(stderr, "\n");
660 0 : exit(1);
661 : }
662 : }
663 : }
664 : }
665 :
666 : /* Send the query for this step. */
667 23948 : if (!PQsendQuery(conn, step->sql))
668 : {
669 0 : fprintf(stdout, "failed to send query for step %s: %s\n",
670 : step->name, PQerrorMessage(conn));
671 0 : exit(1);
672 : }
673 :
674 : /* Remember we launched a step. */
675 23948 : iconn->active_step = pstep;
676 :
677 : /* Remember target number of NOTICEs for any blocker conditions. */
678 24082 : for (j = 0; j < pstep->nblockers; j++)
679 : {
680 134 : PermutationStepBlocker *blocker = pstep->blockers[j];
681 :
682 134 : if (blocker->blocktype == PSB_NUM_NOTICES)
683 2 : blocker->target_notices = blocker->num_notices +
684 2 : conns[blocker->step->session + 1].total_notices;
685 : }
686 :
687 : /* Try to complete this step without blocking. */
688 23948 : mustwait = try_complete_step(testspec, pstep, STEP_NONBLOCK);
689 :
690 : /* Check for completion of any steps that were previously waiting. */
691 23948 : nwaiting = try_complete_steps(testspec, waiting, nwaiting,
692 : STEP_NONBLOCK | STEP_RETRY);
693 :
694 : /* If this step is waiting, add it to the array of waiters. */
695 23948 : if (mustwait)
696 1368 : waiting[nwaiting++] = pstep;
697 : }
698 :
699 : /* Wait for any remaining queries. */
700 3362 : nwaiting = try_complete_steps(testspec, waiting, nwaiting, STEP_RETRY);
701 3362 : if (nwaiting != 0)
702 : {
703 0 : fprintf(stderr, "failed to complete permutation due to mutually-blocking steps\n");
704 0 : exit(1);
705 : }
706 :
707 : /* Perform per-session teardown */
708 11416 : for (i = 0; i < testspec->nsessions; i++)
709 : {
710 8054 : if (testspec->sessions[i]->teardownsql)
711 : {
712 430 : res = PQexec(conns[i + 1].conn, testspec->sessions[i]->teardownsql);
713 430 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
714 : {
715 170 : printResultSet(res);
716 : }
717 260 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
718 : {
719 0 : fprintf(stderr, "teardown of session %s failed: %s",
720 0 : conns[i + 1].sessionname,
721 0 : PQerrorMessage(conns[i + 1].conn));
722 : /* don't exit on teardown failure */
723 : }
724 430 : PQclear(res);
725 : }
726 : }
727 :
728 : /* Perform teardown */
729 3362 : if (testspec->teardownsql)
730 : {
731 3258 : res = PQexec(conns[0].conn, testspec->teardownsql);
732 3258 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
733 : {
734 80 : printResultSet(res);
735 : }
736 3178 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
737 : {
738 0 : fprintf(stderr, "teardown failed: %s",
739 0 : PQerrorMessage(conns[0].conn));
740 : /* don't exit on teardown failure */
741 : }
742 3258 : PQclear(res);
743 : }
744 :
745 3362 : free(waiting);
746 3362 : }
747 :
748 : /*
749 : * Check for completion of any waiting step(s).
750 : * Remove completed ones from the waiting[] array,
751 : * and return the new value of nwaiting.
752 : * See try_complete_step for the meaning of the flags.
753 : */
754 : static int
755 27390 : try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
756 : int nwaiting, int flags)
757 : {
758 : int old_nwaiting;
759 : bool have_blocker;
760 :
761 : do
762 : {
763 27390 : int w = 0;
764 :
765 : /* Reset latch; we only care about notices received within loop. */
766 27390 : any_new_notice = false;
767 :
768 : /* Likewise, these variables reset for each retry. */
769 27390 : old_nwaiting = nwaiting;
770 27390 : have_blocker = false;
771 :
772 : /* Scan the array, try to complete steps. */
773 29726 : while (w < nwaiting)
774 : {
775 2336 : if (try_complete_step(testspec, waiting[w], flags))
776 : {
777 : /* Still blocked, leave it alone. */
778 1036 : if (waiting[w]->nblockers > 0)
779 102 : have_blocker = true;
780 1036 : w++;
781 : }
782 : else
783 : {
784 : /* Done, remove it from array. */
785 1300 : if (w + 1 < nwaiting)
786 48 : memmove(&waiting[w], &waiting[w + 1],
787 48 : (nwaiting - (w + 1)) * sizeof(PermutationStep *));
788 1300 : nwaiting--;
789 : }
790 : }
791 :
792 : /*
793 : * If any of the still-waiting steps have blocker conditions attached,
794 : * it's possible that one of the steps we examined afterwards has
795 : * released them (either by completing, or by sending a NOTICE). If
796 : * any step completions or NOTICEs happened, repeat the loop until
797 : * none occurs. Without this provision, completion timing could vary
798 : * depending on the order in which the steps appear in the array.
799 : */
800 27390 : } while (have_blocker && (nwaiting < old_nwaiting || any_new_notice));
801 27378 : return nwaiting;
802 : }
803 :
804 : /*
805 : * Our caller already sent the query associated with this step. Wait for it
806 : * to either complete, or hit a blocking condition.
807 : *
808 : * When calling this function on behalf of a given step for a second or later
809 : * time, pass the STEP_RETRY flag. Do not pass it on the first call.
810 : *
811 : * Returns true if the step was *not* completed, false if it was completed.
812 : * Reasons for non-completion are (a) the STEP_NONBLOCK flag was specified
813 : * and the query is waiting to acquire a lock, or (b) the step has an
814 : * unsatisfied blocker condition. When STEP_NONBLOCK is given, we assume
815 : * that any lock wait will persist until we have executed additional steps.
816 : */
817 : static bool
818 26352 : try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
819 : {
820 26352 : Step *step = pstep->step;
821 26352 : IsoConnInfo *iconn = &conns[1 + step->session];
822 26352 : PGconn *conn = iconn->conn;
823 : fd_set read_set;
824 : struct timeval start_time;
825 : struct timeval timeout;
826 26352 : int sock = PQsocket(conn);
827 : int ret;
828 : PGresult *res;
829 : PGnotify *notify;
830 26352 : bool canceled = false;
831 :
832 : /*
833 : * If the step is annotated with (*), then on the first call, force it to
834 : * wait. This is useful for ensuring consistent output when the step
835 : * might or might not complete so fast that we don't observe it waiting.
836 : */
837 26352 : if (!(flags & STEP_RETRY))
838 : {
839 : int i;
840 :
841 24058 : for (i = 0; i < pstep->nblockers; i++)
842 : {
843 134 : PermutationStepBlocker *blocker = pstep->blockers[i];
844 :
845 134 : if (blocker->blocktype == PSB_ONCE)
846 : {
847 24 : printf("step %s: %s <waiting ...>\n",
848 : step->name, step->sql);
849 24 : return true;
850 : }
851 : }
852 : }
853 :
854 26328 : if (sock < 0)
855 : {
856 0 : fprintf(stderr, "invalid socket: %s", PQerrorMessage(conn));
857 0 : exit(1);
858 : }
859 :
860 26328 : gettimeofday(&start_time, NULL);
861 26328 : FD_ZERO(&read_set);
862 :
863 51950 : while (PQisBusy(conn))
864 : {
865 27938 : FD_SET(sock, &read_set);
866 27938 : timeout.tv_sec = 0;
867 27938 : timeout.tv_usec = 10000; /* Check for lock waits every 10ms. */
868 :
869 27938 : ret = select(sock + 1, &read_set, NULL, NULL, &timeout);
870 27938 : if (ret < 0) /* error in select() */
871 : {
872 0 : if (errno == EINTR)
873 0 : continue;
874 0 : fprintf(stderr, "select failed: %m\n");
875 0 : exit(1);
876 : }
877 27938 : else if (ret == 0) /* select() timeout: check for lock wait */
878 : {
879 : struct timeval current_time;
880 : int64 td;
881 :
882 : /* If it's OK for the step to block, check whether it has. */
883 3342 : if (flags & STEP_NONBLOCK)
884 : {
885 : bool waiting;
886 :
887 3334 : res = PQexecPrepared(conns[0].conn, PREP_WAITING, 1,
888 3334 : &conns[step->session + 1].backend_pid_str,
889 : NULL, NULL, 0);
890 6668 : if (PQresultStatus(res) != PGRES_TUPLES_OK ||
891 3334 : PQntuples(res) != 1)
892 : {
893 0 : fprintf(stderr, "lock wait query failed: %s",
894 0 : PQerrorMessage(conns[0].conn));
895 0 : exit(1);
896 : }
897 3334 : waiting = ((PQgetvalue(res, 0, 0))[0] == 't');
898 3334 : PQclear(res);
899 :
900 3334 : if (waiting) /* waiting to acquire a lock */
901 : {
902 : /*
903 : * Since it takes time to perform the lock-check query,
904 : * some data --- notably, NOTICE messages --- might have
905 : * arrived since we looked. We must call PQconsumeInput
906 : * and then PQisBusy to collect and process any such
907 : * messages. In the (unlikely) case that PQisBusy then
908 : * returns false, we might as well go examine the
909 : * available result.
910 : */
911 2316 : if (!PQconsumeInput(conn))
912 : {
913 0 : fprintf(stderr, "PQconsumeInput failed: %s\n",
914 : PQerrorMessage(conn));
915 0 : exit(1);
916 : }
917 2316 : if (!PQisBusy(conn))
918 0 : break;
919 :
920 : /*
921 : * conn is still busy, so conclude that the step really is
922 : * waiting.
923 : */
924 2316 : if (!(flags & STEP_RETRY))
925 1282 : printf("step %s: %s <waiting ...>\n",
926 : step->name, step->sql);
927 2316 : return true;
928 : }
929 : /* else, not waiting */
930 : }
931 :
932 : /* Figure out how long we've been waiting for this step. */
933 1026 : gettimeofday(¤t_time, NULL);
934 1026 : td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
935 1026 : td *= USECS_PER_SEC;
936 1026 : td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
937 :
938 : /*
939 : * After max_step_wait microseconds, try to cancel the query.
940 : *
941 : * If the user tries to test an invalid permutation, we don't want
942 : * to hang forever, especially when this is running in the
943 : * buildfarm. This will presumably lead to this permutation
944 : * failing, but remaining permutations and tests should still be
945 : * OK.
946 : */
947 1026 : if (td > max_step_wait && !canceled)
948 : {
949 0 : PGcancelConn *cancel_conn = PQcancelCreate(conn);
950 :
951 0 : if (PQcancelBlocking(cancel_conn))
952 : {
953 : /*
954 : * print to stdout not stderr, as this should appear in
955 : * the test case's results
956 : */
957 0 : printf("isolationtester: canceling step %s after %d seconds\n",
958 : step->name, (int) (td / USECS_PER_SEC));
959 0 : canceled = true;
960 : }
961 : else
962 0 : fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
963 0 : PQcancelFinish(cancel_conn);
964 : }
965 :
966 : /*
967 : * After twice max_step_wait, just give up and die.
968 : *
969 : * Since cleanup steps won't be run in this case, this may cause
970 : * later tests to fail. That stinks, but it's better than waiting
971 : * forever for the server to respond to the cancel.
972 : */
973 1026 : if (td > 2 * max_step_wait)
974 : {
975 0 : fprintf(stderr, "step %s timed out after %d seconds\n",
976 0 : step->name, (int) (td / USECS_PER_SEC));
977 0 : exit(1);
978 : }
979 : }
980 24596 : else if (!PQconsumeInput(conn)) /* select(): data available */
981 : {
982 0 : fprintf(stderr, "PQconsumeInput failed: %s\n",
983 : PQerrorMessage(conn));
984 0 : exit(1);
985 : }
986 : }
987 :
988 : /*
989 : * The step is done, but we won't report it as complete so long as there
990 : * are blockers.
991 : */
992 24012 : if (step_has_blocker(pstep))
993 : {
994 64 : if (!(flags & STEP_RETRY))
995 62 : printf("step %s: %s <waiting ...>\n",
996 : step->name, step->sql);
997 64 : return true;
998 : }
999 :
1000 : /* Otherwise, go ahead and complete it. */
1001 23948 : if (flags & STEP_RETRY)
1002 1368 : printf("step %s: <... completed>\n", step->name);
1003 : else
1004 22580 : printf("step %s: %s\n", step->name, step->sql);
1005 :
1006 48492 : while ((res = PQgetResult(conn)))
1007 : {
1008 24544 : switch (PQresultStatus(res))
1009 : {
1010 16664 : case PGRES_COMMAND_OK:
1011 : case PGRES_EMPTY_QUERY:
1012 16664 : break;
1013 6930 : case PGRES_TUPLES_OK:
1014 6930 : printResultSet(res);
1015 6930 : break;
1016 950 : case PGRES_FATAL_ERROR:
1017 :
1018 : /*
1019 : * Detail may contain XID values, so we want to just show
1020 : * primary. Beware however that libpq-generated error results
1021 : * may not contain subfields, only an old-style message.
1022 : */
1023 : {
1024 950 : const char *sev = PQresultErrorField(res,
1025 : PG_DIAG_SEVERITY);
1026 950 : const char *msg = PQresultErrorField(res,
1027 : PG_DIAG_MESSAGE_PRIMARY);
1028 :
1029 950 : if (sev && msg)
1030 946 : printf("%s: %s\n", sev, msg);
1031 : else
1032 4 : printf("%s\n", PQresultErrorMessage(res));
1033 : }
1034 950 : break;
1035 0 : default:
1036 0 : printf("unexpected result status: %s\n",
1037 : PQresStatus(PQresultStatus(res)));
1038 : }
1039 24544 : PQclear(res);
1040 : }
1041 :
1042 : /* Report any available NOTIFY messages, too */
1043 23948 : PQconsumeInput(conn);
1044 24002 : while ((notify = PQnotifies(conn)) != NULL)
1045 : {
1046 : /* Try to identify which session it came from */
1047 54 : const char *sendername = NULL;
1048 : char pidstring[32];
1049 : int i;
1050 :
1051 54 : for (i = 0; i < testspec->nsessions; i++)
1052 : {
1053 54 : if (notify->be_pid == conns[i + 1].backend_pid)
1054 : {
1055 54 : sendername = conns[i + 1].sessionname;
1056 54 : break;
1057 : }
1058 : }
1059 54 : if (sendername == NULL)
1060 : {
1061 : /* Doesn't seem to be any test session, so show the hard way */
1062 0 : snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
1063 0 : sendername = pidstring;
1064 : }
1065 54 : printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
1066 : testspec->sessions[step->session]->name,
1067 : notify->relname, notify->extra, sendername);
1068 54 : PQfreemem(notify);
1069 54 : PQconsumeInput(conn);
1070 : }
1071 :
1072 : /* Connection is now idle. */
1073 23948 : iconn->active_step = NULL;
1074 :
1075 23948 : return false;
1076 : }
1077 :
1078 : /* Detect whether a step has any unsatisfied blocker conditions */
1079 : static bool
1080 24012 : step_has_blocker(PermutationStep *pstep)
1081 : {
1082 : int i;
1083 :
1084 24146 : for (i = 0; i < pstep->nblockers; i++)
1085 : {
1086 198 : PermutationStepBlocker *blocker = pstep->blockers[i];
1087 : IsoConnInfo *iconn;
1088 :
1089 198 : switch (blocker->blocktype)
1090 : {
1091 24 : case PSB_ONCE:
1092 : /* Ignore; try_complete_step handles this specially */
1093 24 : break;
1094 172 : case PSB_OTHER_STEP:
1095 : /* Block if referenced step is active */
1096 172 : iconn = &conns[1 + blocker->step->session];
1097 172 : if (iconn->active_step &&
1098 64 : iconn->active_step->step == blocker->step)
1099 64 : return true;
1100 108 : break;
1101 2 : case PSB_NUM_NOTICES:
1102 : /* Block if not enough notices received yet */
1103 2 : iconn = &conns[1 + blocker->step->session];
1104 2 : if (iconn->total_notices < blocker->target_notices)
1105 0 : return true;
1106 2 : break;
1107 : }
1108 134 : }
1109 23948 : return false;
1110 : }
1111 :
1112 : static void
1113 7376 : printResultSet(PGresult *res)
1114 : {
1115 : PQprintOpt popt;
1116 :
1117 7376 : memset(&popt, 0, sizeof(popt));
1118 7376 : popt.header = true;
1119 7376 : popt.align = true;
1120 7376 : popt.fieldSep = "|";
1121 7376 : PQprint(stdout, res, &popt);
1122 7376 : }
1123 :
1124 : /* notice processor for regular user sessions */
1125 : static void
1126 1090 : isotesterNoticeProcessor(void *arg, const char *message)
1127 : {
1128 1090 : IsoConnInfo *myconn = (IsoConnInfo *) arg;
1129 :
1130 : /* Prefix the backend's message with the session name. */
1131 1090 : printf("%s: %s", myconn->sessionname, message);
1132 : /* Record notices, since we may need this to decide to unblock a step. */
1133 1090 : myconn->total_notices++;
1134 1090 : any_new_notice = true;
1135 1090 : }
1136 :
1137 : /* notice processor, hides the message */
1138 : static void
1139 900 : blackholeNoticeProcessor(void *arg, const char *message)
1140 : {
1141 : /* do nothing */
1142 900 : }
|