From 59a9a03da728d53364f9c3d6fe8b48e21697b93e Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Mon, 6 Apr 2020 21:28:55 -0700 Subject: [PATCH v7 12/12] WIP: pgbench --- src/bin/pgbench/pgbench.c | 107 +++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 24 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index e99af801675..21d1ab2aac1 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -310,6 +310,10 @@ typedef struct RandomState /* Various random sequences are initialized from this one. */ static RandomState base_random_sequence; +#ifdef ENABLE_THREAD_SAFETY +pthread_barrier_t conn_barrier; +#endif + /* * Connection state machine states. */ @@ -5206,6 +5210,10 @@ printResults(StatsData *total, instr_time total_time, tps_exclude = ntx / (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients)); + //fprintf(stderr, "time: include: %f, exclude: %f, conn total: %f\n", + // time_include, time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients), + // INSTR_TIME_GET_DOUBLE(conn_total_time)); + /* Report test parameters. */ printf("transaction type: %s\n", num_scripts == 1 ? sql_script[0].desc : "multiple scripts"); @@ -6126,26 +6134,14 @@ main(int argc, char **argv) /* all clients must be assigned to a thread */ Assert(nclients_dealt == nclients); - /* get start up time */ - INSTR_TIME_SET_CURRENT(start_time); - - /* set alarm if duration is specified. */ - if (duration > 0) - setalarm(duration); - /* start threads */ #ifdef ENABLE_THREAD_SAFETY + pthread_barrier_init(&conn_barrier, NULL, nthreads); + for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; - INSTR_TIME_SET_CURRENT(thread->start_time); - - /* compute when to stop */ - if (duration > 0) - end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) + - (int64) 1000000 * duration; - /* the first thread (i = 0) is executed by main thread */ if (i > 0) { @@ -6162,13 +6158,38 @@ main(int argc, char **argv) thread->thread = INVALID_THREAD; } } -#else - INSTR_TIME_SET_CURRENT(threads[0].start_time); - /* compute when to stop */ +#endif /* ENABLE_THREAD_SAFETY */ + +#ifdef ENABLE_THREAD_SAFETY + /* wait till all threads started (threads wait in threadRun()) */ + //fprintf(stderr, "andres: waiting for thread start: %u\n", threads[0].tid); + pthread_barrier_wait(&conn_barrier); +#endif /* ENABLE_THREAD_SAFETY */ + + /* get start up time */ + INSTR_TIME_SET_CURRENT(start_time); + + /* */ + for (i = 0; i < nthreads; i++) + { + TState *thread = &threads[i]; + + thread->start_time = start_time; + + /* compute when to stop */ + if (duration > 0) + end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) + + (int64) 1000000 * duration; + } + + /* set alarm if duration is specified. */ if (duration > 0) - end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) + - (int64) 1000000 * duration; - threads[0].thread = INVALID_THREAD; + setalarm(duration); + +#ifdef ENABLE_THREAD_SAFETY + /* updated start time (threads wait in threadRun()) */ + //fprintf(stderr, "andres: %u: waiting for start time\n", threads[0].tid); + pthread_barrier_wait(&conn_barrier); #endif /* ENABLE_THREAD_SAFETY */ /* wait for threads and accumulate results */ @@ -6236,12 +6257,30 @@ threadRun(void *arg) int i; /* for reporting progress: */ - int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); - int64 last_report = thread_start; - int64 next_report = last_report + (int64) progress * 1000000; + int64 thread_start; + int64 last_report; + int64 next_report; StatsData last, aggs; + /* wait till all threads started (main waits outside) */ + if (thread->tid != 0) + { + //fprintf(stderr, "andres: %u: waiting for thread start\n", thread->tid); + pthread_barrier_wait(&conn_barrier); + } + + /* wait for start time to be initialized (main waits outside) */ + if (thread->tid != 0) + { + //fprintf(stderr, "andres: %u: waiting for start time\n", thread->tid); + pthread_barrier_wait(&conn_barrier); + } + + thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); + last_report = thread_start; + next_report = last_report + (int64) progress * 1000000; + /* * Initialize throttling rate target for all of the thread's clients. It * might be a little more accurate to reset thread->start_time here too. @@ -6288,7 +6327,27 @@ threadRun(void *arg) /* time after thread and connections set up */ INSTR_TIME_SET_CURRENT(thread->conn_time); - INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time); + INSTR_TIME_SUBTRACT(thread->conn_time, start); + + // e = thread->conn_time; + //fprintf(stderr, "andres: %u: connection established in %f (s %f, e %f)\n", + // thread->tid, INSTR_TIME_GET_DOUBLE(thread->conn_time), + // INSTR_TIME_GET_DOUBLE(e), + // INSTR_TIME_GET_DOUBLE(start)); + + /* add once for each other connection */ + if (!is_connect) + { + instr_time e = thread->conn_time; + for (i = 0; i < (nstate - 1); i++) + { + INSTR_TIME_ADD(thread->conn_time, e); + } + } + + /* wait for all connections to be established */ + //fprintf(stderr, "andres: %u: waiting for connection establishment\n", thread->tid); + pthread_barrier_wait(&conn_barrier); /* explicitly initialize the state machines */ for (i = 0; i < nstate; i++) -- 2.25.0.114.g5b0ca878e0