From d615a3e2cdc6949f4fae83a4a7a7328937f7acbf Mon Sep 17 00:00:00 2001 From: Marina Polyakova Date: Tue, 4 Sep 2018 19:02:32 +0300 Subject: [PATCH v11 1/4] Pgbench errors: use the RandomState structure for thread/client random seed. This is most important when it is used to reset a client's random seed during the repeating of transactions after serialization/deadlock failures. Use the random state of the client for random functions PGBENCH_RANDOM_* during the execution of the script. Use the random state of the each thread option (to choose the script / get the throttle delay / to log with a sample rate) to make all of them independent of each other and therefore deterministic at the thread level. --- src/bin/pgbench/pgbench.c | 104 +++++++++++++++++++++++++++----------- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 41b756c089..988e37bce5 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -250,6 +250,14 @@ typedef struct StatsData SimpleStats lag; } StatsData; +/* + * Data structure for thread/client random seed. + */ +typedef struct +{ + unsigned short xseed[3]; +} RandomState; + /* * Connection state machine states. */ @@ -331,6 +339,12 @@ typedef struct ConnectionStateEnum state; /* state machine's current state. */ ConditionalStack cstack; /* enclosing conditionals state */ + /* + * Separate randomness for each client. This is used for random functions + * PGBENCH_RANDOM_* during the execution of the script. + */ + RandomState random_state; + int use_file; /* index in sql_script for this client */ int command; /* command number in script */ @@ -390,7 +404,16 @@ typedef struct pthread_t thread; /* thread handle */ CState *state; /* array of CState */ int nstate; /* length of state[] */ - unsigned short random_state[3]; /* separate randomness for each thread */ + + /* + * Separate randomness for each thread. Each thread option uses its own + * random state to make all of them independent of each other and therefore + * deterministic at the thread level. + */ + RandomState choose_script_rs; /* random state for selecting a script */ + RandomState throttling_rs; /* random state for transaction throttling */ + RandomState sampling_rs; /* random state for log sampling */ + int64 throttle_trigger; /* previous/next throttling (us) */ FILE *logfile; /* where to log, or NULL */ ZipfCache zipf_cache; /* for thread-safe zipfian random number @@ -694,7 +717,7 @@ gotdigits: /* random number generator: uniform distribution from min to max inclusive */ static int64 -getrand(TState *thread, int64 min, int64 max) +getrand(RandomState *random_state, int64 min, int64 max) { /* * Odd coding is so that min and max have approximately the same chance of @@ -705,7 +728,7 @@ getrand(TState *thread, int64 min, int64 max) * protected by a mutex, and therefore a bottleneck on machines with many * CPUs. */ - return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state)); + return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed)); } /* @@ -714,7 +737,8 @@ getrand(TState *thread, int64 min, int64 max) * value is exp(-parameter). */ static int64 -getExponentialRand(TState *thread, int64 min, int64 max, double parameter) +getExponentialRand(RandomState *random_state, int64 min, int64 max, + double parameter) { double cut, uniform, @@ -724,7 +748,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter) Assert(parameter > 0.0); cut = exp(-parameter); /* erand in [0, 1), uniform in (0, 1] */ - uniform = 1.0 - pg_erand48(thread->random_state); + uniform = 1.0 - pg_erand48(random_state->xseed); /* * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1) @@ -737,7 +761,8 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter) /* random number generator: gaussian distribution from min to max inclusive */ static int64 -getGaussianRand(TState *thread, int64 min, int64 max, double parameter) +getGaussianRand(RandomState *random_state, int64 min, int64 max, + double parameter) { double stdev; double rand; @@ -765,8 +790,8 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter) * are expected in (0, 1] (see * https://en.wikipedia.org/wiki/Box-Muller_transform) */ - double rand1 = 1.0 - pg_erand48(thread->random_state); - double rand2 = 1.0 - pg_erand48(thread->random_state); + double rand1 = 1.0 - pg_erand48(random_state->xseed); + double rand2 = 1.0 - pg_erand48(random_state->xseed); /* Box-Muller basic form transform */ double var_sqrt = sqrt(-2.0 * log(rand1)); @@ -793,7 +818,7 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter) * will approximate a Poisson distribution centered on the given value. */ static int64 -getPoissonRand(TState *thread, int64 center) +getPoissonRand(RandomState *random_state, int64 center) { /* * Use inverse transform sampling to generate a value > 0, such that the @@ -802,7 +827,7 @@ getPoissonRand(TState *thread, int64 center) double uniform; /* erand in [0, 1), uniform in (0, 1] */ - uniform = 1.0 - pg_erand48(thread->random_state); + uniform = 1.0 - pg_erand48(random_state->xseed); return (int64) (-log(uniform) * ((double) center) + 0.5); } @@ -880,7 +905,7 @@ zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s) * Luc Devroye, p. 550-551, Springer 1986. */ static int64 -computeIterativeZipfian(TState *thread, int64 n, double s) +computeIterativeZipfian(RandomState *random_state, int64 n, double s) { double b = pow(2.0, s - 1.0); double x, @@ -891,8 +916,8 @@ computeIterativeZipfian(TState *thread, int64 n, double s) while (true) { /* random variates */ - u = pg_erand48(thread->random_state); - v = pg_erand48(thread->random_state); + u = pg_erand48(random_state->xseed); + v = pg_erand48(random_state->xseed); x = floor(pow(u, -1.0 / (s - 1.0))); @@ -910,10 +935,11 @@ computeIterativeZipfian(TState *thread, int64 n, double s) * Jim Gray et al, SIGMOD 1994 */ static int64 -computeHarmonicZipfian(TState *thread, int64 n, double s) +computeHarmonicZipfian(ZipfCache *zipf_cache, RandomState *random_state, + int64 n, double s) { - ZipfCell *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s); - double uniform = pg_erand48(thread->random_state); + ZipfCell *cell = zipfFindOrCreateCacheCell(zipf_cache, n, s); + double uniform = pg_erand48(random_state->xseed); double uz = uniform * cell->harmonicn; if (uz < 1.0) @@ -925,7 +951,8 @@ computeHarmonicZipfian(TState *thread, int64 n, double s) /* random number generator: zipfian distribution from min to max inclusive */ static int64 -getZipfianRand(TState *thread, int64 min, int64 max, double s) +getZipfianRand(ZipfCache *zipf_cache, RandomState *random_state, int64 min, + int64 max, double s) { int64 n = max - min + 1; @@ -934,8 +961,8 @@ getZipfianRand(TState *thread, int64 min, int64 max, double s) return min - 1 + ((s > 1) - ? computeIterativeZipfian(thread, n, s) - : computeHarmonicZipfian(thread, n, s)); + ? computeIterativeZipfian(random_state, n, s) + : computeHarmonicZipfian(zipf_cache, random_state, n, s)); } /* @@ -2209,7 +2236,7 @@ evalStandardFunc(TState *thread, CState *st, if (func == PGBENCH_RANDOM) { Assert(nargs == 2); - setIntValue(retval, getrand(thread, imin, imax)); + setIntValue(retval, getrand(&st->random_state, imin, imax)); } else /* gaussian & exponential */ { @@ -2231,7 +2258,8 @@ evalStandardFunc(TState *thread, CState *st, } setIntValue(retval, - getGaussianRand(thread, imin, imax, param)); + getGaussianRand(&st->random_state, imin, + imax, param)); } else if (func == PGBENCH_RANDOM_ZIPFIAN) { @@ -2243,7 +2271,9 @@ evalStandardFunc(TState *thread, CState *st, return false; } setIntValue(retval, - getZipfianRand(thread, imin, imax, param)); + getZipfianRand(&thread->zipf_cache, + &st->random_state, imin, + imax, param)); } else /* exponential */ { @@ -2256,7 +2286,8 @@ evalStandardFunc(TState *thread, CState *st, } setIntValue(retval, - getExponentialRand(thread, imin, imax, param)); + getExponentialRand(&st->random_state, imin, + imax, param)); } } @@ -2551,7 +2582,7 @@ chooseScript(TState *thread) if (num_scripts == 1) return 0; - w = getrand(thread, 0, total_weight - 1); + w = getrand(&thread->choose_script_rs, 0, total_weight - 1); do { w -= sql_script[i++].weight; @@ -2745,7 +2776,7 @@ doCustom(TState *thread, CState *st, StatsData *agg) * away. */ Assert(throttle_delay > 0); - wait = getPoissonRand(thread, throttle_delay); + wait = getPoissonRand(&thread->throttling_rs, throttle_delay); thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; @@ -2779,7 +2810,8 @@ doCustom(TState *thread, CState *st, StatsData *agg) { processXactStats(thread, st, &now, true, agg); /* next rendez-vous */ - wait = getPoissonRand(thread, throttle_delay); + wait = getPoissonRand(&thread->throttling_rs, + throttle_delay); thread->throttle_trigger += wait; st->txn_scheduled = thread->throttle_trigger; } @@ -3322,7 +3354,7 @@ doLog(TState *thread, CState *st, * to the random sample. */ if (sample_rate != 0.0 && - pg_erand48(thread->random_state) > sample_rate) + pg_erand48(thread->sampling_rs.xseed) > sample_rate) return; /* should we aggregate the results or not? */ @@ -4750,6 +4782,17 @@ set_random_seed(const char *seed) return true; } +/* + * Initialize the random state of the client/thread. + */ +static void +initRandomState(RandomState *random_state) +{ + random_state->xseed[0] = random(); + random_state->xseed[1] = random(); + random_state->xseed[2] = random(); +} + int main(int argc, char **argv) @@ -5358,6 +5401,7 @@ main(int argc, char **argv) for (i = 0; i < nclients; i++) { state[i].cstack = conditional_stack_create(); + initRandomState(&state[i].random_state); } if (debug) @@ -5491,9 +5535,9 @@ main(int argc, char **argv) thread->state = &state[nclients_dealt]; thread->nstate = (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i); - thread->random_state[0] = random(); - thread->random_state[1] = random(); - thread->random_state[2] = random(); + initRandomState(&thread->choose_script_rs); + initRandomState(&thread->throttling_rs); + initRandomState(&thread->sampling_rs); thread->logfile = NULL; /* filled in later */ thread->latency_late = 0; thread->zipf_cache.nb_cells = 0; -- 2.17.1