diff --git a/bench/db_bench_cascadb.cpp b/bench/db_bench_cascadb.cpp index 10a1fa7..6fda0a5 100644 --- a/bench/db_bench_cascadb.cpp +++ b/bench/db_bench_cascadb.cpp @@ -32,10 +32,10 @@ using namespace cascadb; static const char* FLAGS_benchmarks = "fillseq," "readrandom," - "readseq," + "readhot," "fillrandom," "readrandom," - "readseq," + "readhot," ; // Number of key/values to place in database @@ -44,6 +44,9 @@ static size_t FLAGS_num = 1000000; // Number of read operations to do. If zero, do FLAGS_num reads. static size_t FLAGS_reads = 0; +// Number of concurrent threads to run. +static int FLAGS_threads = 1; + // Size of each value static size_t FLAGS_value_size = 100; @@ -106,7 +109,6 @@ static void DBSynchronize(DB* db) db->flush(); } -inline static Slice TrimSpace(Slice s) { size_t start = 0; while (start < s.size() && isspace(s[start])) { @@ -119,6 +121,155 @@ static Slice TrimSpace(Slice s) { return Slice(s.data() + start, limit - start); } +static void AppendWithSpace(std::string* str, Slice msg) { + if (msg.empty()) return; + if (!str->empty()) { + str->push_back(' '); + } + str->append(msg.data(), msg.size()); +} + +class Stats { + private: + double start_; + double finish_; + double seconds_; + int done_; + int next_report_; + int64_t bytes_; + double last_op_finish_; + Histogram hist_; + std::string message_; + + public: + Stats() { Start(); } + + void Start() { + next_report_ = 100; + last_op_finish_ = start_; + hist_.Clear(); + done_ = 0; + bytes_ = 0; + seconds_ = 0; + start_ = now_micros(); + finish_ = start_; + message_.clear(); + } + + void Merge(const Stats& other) { + hist_.Merge(other.hist_); + done_ += other.done_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.empty()) message_ = other.message_; + } + + void Stop() { + finish_ = now_micros(); + seconds_ = (finish_ - start_) * 1e-6; + } + + void AddMessage(Slice msg) { + AppendWithSpace(&message_, msg); + } + + void FinishedSingleOp() { + if (FLAGS_histogram) { + double now = now_micros(); + double micros = now - last_op_finish_; + hist_.Add(micros); + if (micros > 20000) { + fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); + fflush(stderr); + } + last_op_finish_ = now; + } + + done_++; + if (done_ >= next_report_) { + if (next_report_ < 1000) next_report_ += 100; + else if (next_report_ < 5000) next_report_ += 500; + else if (next_report_ < 10000) next_report_ += 1000; + else if (next_report_ < 50000) next_report_ += 5000; + else if (next_report_ < 100000) next_report_ += 10000; + else if (next_report_ < 500000) next_report_ += 50000; + else next_report_ += 100000; + fprintf(stderr, "... finished %d ops%30s\r", done_, ""); + fflush(stderr); + } + } + + void AddBytes(int64_t n) { + bytes_ += n; + } + + void Report(const Slice& name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + double elapsed = (finish_ - start_) * 1e-6; + + std::string extra; + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + char rate[100]; + snprintf(rate, sizeof(rate), "%6.1f MB/s", + (bytes_ / 1048576.0) / elapsed); + extra = rate; + } + AppendWithSpace(&extra, message_); + + //should computed on actual elapsed time too + fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", + name.to_string().c_str(), + elapsed * 1e6 / done_, + (extra.empty() ? "" : " "), + extra.c_str()); + if (FLAGS_histogram) { + fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); + } + fflush(stdout); + } +}; + +// State shared by all concurrent executions of the same benchmark. +struct SharedState { + Mutex mu; + CondVar cv; + int total; + + // Each thread goes through the following states: + // (1) initializing + // (2) waiting for others to be initialized + // (3) running + // (4) done + + int num_initialized; + int num_done; + bool start; + + SharedState() : cv(&mu) { } +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + int tid; // 0..n-1 when running in n threads + Random rand; // Has different seeds for different threads + Stats stats; + SharedState* shared; + + ThreadState(int index) + : tid(index), + rand(1000 + index) { + } +}; + class Benchmark { private: Directory *directory_; @@ -217,71 +368,6 @@ class Benchmark { #endif } - void Start() { - start_ = now_micros() * 1e-6; - bytes_ = 0; - message_.clear(); - last_op_finish_ = start_; - hist_.Clear(); - done_ = 0; - next_report_ = 100; - } - - void FinishedSingleOp() { - if (FLAGS_histogram) { - double now = now_micros() * 1e-6; - double micros = (now - last_op_finish_) * 1e6; - hist_.Add(micros); - if (micros > 20000) { - fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); - fflush(stderr); - } - last_op_finish_ = now; - } - - done_++; - if (done_ >= next_report_) { - if (next_report_ < 1000) next_report_ += 100; - else if (next_report_ < 5000) next_report_ += 500; - else if (next_report_ < 10000) next_report_ += 1000; - else if (next_report_ < 50000) next_report_ += 5000; - else if (next_report_ < 100000) next_report_ += 10000; - else if (next_report_ < 500000) next_report_ += 50000; - else next_report_ += 100000; - fprintf(stderr, "... finished %d ops%30s\r", done_, ""); - fflush(stderr); - } - } - - void Stop(const Slice& name) { - double finish = now_micros() * 1e-6; - - // Pretend at least one op was done in case we are running a benchmark - // that does not call FinishedSingleOp(). - if (done_ < 1) done_ = 1; - - if (bytes_ > 0) { - char rate[100]; - snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / (finish - start_)); - if (!message_.empty()) { - message_ = std::string(rate) + " " + message_; - } else { - message_ = rate; - } - } - - fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", - name.to_string().c_str(), - (finish - start_) * 1e6 / done_, - (message_.empty() ? "" : " "), - message_.c_str()); - if (FLAGS_histogram) { - fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); - } - fflush(stdout); - } - public: Benchmark() @@ -306,6 +392,8 @@ class Benchmark { PrintHeader(); Open(); + int cnt = 0; + const char* benchmarks = FLAGS_benchmarks; while (benchmarks != NULL) { const char* sep = strchr(benchmarks, ','); @@ -318,10 +406,11 @@ class Benchmark { benchmarks = sep + 1; } - void (Benchmark::*method)() = NULL; + void (Benchmark::*method)(ThreadState*) = NULL; - bool known = true; bool fresh_db = false; + int num_threads = FLAGS_threads; + if (name == Slice("fillseq")) { fresh_db = true; method = &Benchmark::WriteSeq; @@ -332,8 +421,9 @@ class Benchmark { method = &Benchmark::ReadSequential; } else if (name == Slice("readrandom")) { method = &Benchmark::ReadRandom; + } else if (name == Slice("readhot")) { + method = &Benchmark::ReadHot; } else { - known = false; if (name != Slice()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.to_string().c_str()); } @@ -344,24 +434,99 @@ class Benchmark { if (FLAGS_use_existing_db) { message_ = "skipping (--use_existing_db is true)"; method = NULL; + } else { + if (cnt != 0) { + delete db_; + db_ = NULL; + Open(); + } } + } - delete db_; - db_ = NULL; + if (method) { + RunBenchmark(num_threads, name, method); + cnt ++; + } + } + } - Open(); +private: + struct ThreadArg { + Benchmark* bm; + SharedState* shared; + ThreadState* thread; + void (Benchmark::*method)(ThreadState*); + }; + + static void* ThreadBody(void* v) { + ThreadArg* arg = reinterpret_cast(v); + SharedState* shared = arg->shared; + ThreadState* thread = arg->thread; + { + ScopedMutex l(&shared->mu); + shared->num_initialized++; + if (shared->num_initialized >= shared->total) { + shared->cv.notify_all(); + } + while (!shared->start) { + shared->cv.wait(); } + } - Start(); + thread->stats.Start(); + (arg->bm->*(arg->method))(thread); + thread->stats.Stop(); - if (method) { - (this->*method)(); + { + ScopedMutex l(&shared->mu); + shared->num_done++; + if (shared->num_done >= shared->total) { + shared->cv.notify_all(); } + } + return NULL; + } - if (known) { - Stop(name); - } + void RunBenchmark(int n, Slice name, + void (Benchmark::*method)(ThreadState*)) { + SharedState shared; + shared.total = n; + shared.num_initialized = 0; + shared.num_done = 0; + shared.start = false; + + ThreadArg* arg = new ThreadArg[n]; + for (int i = 0; i < n; i++) { + arg[i].bm = this; + arg[i].method = method; + arg[i].shared = &shared; + arg[i].thread = new ThreadState(i); + arg[i].thread->shared = &shared; + Thread *thr = new Thread(ThreadBody); + thr->start(&arg[i]); + } + + shared.mu.lock(); + while (shared.num_initialized < n) { + shared.cv.wait(); + } + + shared.start = true; + shared.cv.notify_all(); + while (shared.num_done < n) { + shared.cv.wait(); + } + shared.mu.unlock(); + + for (int i = 1; i < n; i++) { + arg[0].thread->stats.Merge(arg[i].thread->stats); + } + arg[0].thread->stats.Report(name); + + for (int i = 0; i < n; i++) { + delete arg[i].thread; } + delete[] arg; } void Open() @@ -390,54 +555,80 @@ class Benchmark { } } - void WriteSeq() + void WriteSeq(ThreadState* thread) { - Write(false); + Write(thread, false); } - void WriteRandom() + void WriteRandom(ThreadState* thread) { - Write(true); + Write(thread, true); } - void Write(bool random) + void Write(ThreadState* thread, bool random) { + int64_t bytes = 0; for (size_t i = 0; i < num_; i++ ) { - uint64_t k = random ? rand(): i; + uint64_t k = random ? rand() % FLAGS_num: i; char key[100]; snprintf(key, sizeof(key), "%016ld", k); + bytes += FLAGS_value_size + strlen(key); if (!db_->put(key, gen_.Generate(FLAGS_value_size))) { fprintf(stderr, "put key %ld error\n", k); } - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } + thread->stats.AddBytes(bytes); } - void ReadSequential() { + void ReadSequential(ThreadState* thread) { + int bytes = 0; Slice value; for (size_t i = 0; i < reads_; i++) { uint64_t k = i; char key[100]; snprintf(key, sizeof(key), "%016ld", k); if (db_->get(key, value)) { + bytes += value.size() + strlen(key); + value.destroy(); + } + thread->stats.FinishedSingleOp(); + } + thread->stats.AddBytes(bytes); + } + + void ReadRandom(ThreadState* thread) { + int bytes = 0; + Slice value; + for (size_t i = 0; i < reads_; i++) { + uint64_t k = rand() % FLAGS_num; + char key[100]; + snprintf(key, sizeof(key), "%016ld", k); + if (db_->get(key, value)) { + bytes += value.size() + strlen(key); value.destroy(); } - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } + thread->stats.AddBytes(bytes); } - void ReadRandom() { + void ReadHot(ThreadState* thread) { + int bytes = 0; Slice value; + uint64_t range = (FLAGS_num + 99) / 100; for (size_t i = 0; i < reads_; i++) { - uint64_t k = rand(); char key[100]; + uint64_t k = rand() % range; snprintf(key, sizeof(key), "%016ld", k); if (db_->get(key, value)) { + bytes += value.size() + strlen(key); value.destroy(); } - FinishedSingleOp(); + thread->stats.FinishedSingleOp(); } + thread->stats.AddBytes(bytes); } }; @@ -448,7 +639,9 @@ int main(int argc, char** argv) double d; long n; char junk; - if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { + if (strncmp(argv[i], "--benchmarks=", 13) == 0) { + FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); + } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { FLAGS_compression_ratio = d; } else if (sscanf(argv[i], "--histogram=%ld%c", &n, &junk) == 1 && (n == 0 || n == 1)) { @@ -462,6 +655,8 @@ int main(int argc, char** argv) FLAGS_num = n; } else if (sscanf(argv[i], "--reads=%ld%c", &n, &junk) == 1) { FLAGS_reads = n; + } else if (sscanf(argv[i], "--threads=%ld%c", &n, &junk) == 1) { + FLAGS_threads = n; } else if (sscanf(argv[i], "--value_size=%ld%c", &n, &junk) == 1) { FLAGS_value_size = n; } else if(strncmp(argv[i], "--db=", 5) == 0) {