Skip to content

Commit

Permalink
Add multithreaded transaction test
Browse files Browse the repository at this point in the history
Summary: Refactored db_bench transaction stress tests so that they can be called from unit tests as well.

Test Plan: run new unit test as well as db_bench

Reviewers: yhchiang, IslamAbdelRahman, sdong

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D55203
  • Loading branch information
agiardullo committed Mar 11, 2016
1 parent e8e6cf0 commit 7902528
Show file tree
Hide file tree
Showing 6 changed files with 515 additions and 242 deletions.
1 change: 1 addition & 0 deletions src.mk
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ LIB_SOURCES = \
util/env_posix.cc \
util/io_posix.cc \
util/thread_posix.cc \
util/transaction_test_util.cc \
util/sst_file_manager_impl.cc \
util/file_util.cc \
util/file_reader_writer.cc \
Expand Down
209 changes: 39 additions & 170 deletions tools/db_bench_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,34 @@

#include "db/db_impl.h"
#include "db/version_set.h"
#include "rocksdb/options.h"
#include "hdfs/env_hdfs.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/slice.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/flashcache.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/crc32c.h"
#include "rocksdb/write_batch.h"
#include "util/compression.h"
#include "util/crc32c.h"
#include "util/histogram.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/statistics.h"
#include "util/string_util.h"
#include "util/testutil.h"
#include "util/transaction_test_util.h"
#include "util/xxhash.h"
#include "hdfs/env_hdfs.h"
#include "utilities/merge_operators.h"

#ifdef OS_WIN
Expand Down Expand Up @@ -3763,18 +3764,22 @@ class Benchmark {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, readwrites_);
ReadOptions read_options(FLAGS_verify_checksum, true);
std::string value;
DB* db = db_.db;
uint64_t transactions_done = 0;
uint64_t transactions_aborted = 0;
Status s;
uint64_t num_prefix_ranges = FLAGS_transaction_sets;
uint64_t transactions_done = 0;

if (num_prefix_ranges == 0 || num_prefix_ranges > 9999) {
fprintf(stderr, "invalid value for transaction_sets\n");
abort();
}

TransactionOptions txn_options;
txn_options.lock_timeout = FLAGS_transaction_lock_timeout;
txn_options.set_snapshot = FLAGS_transaction_set_snapshot;

RandomTransactionInserter inserter(&thread->rand, write_options_,
read_options, FLAGS_num,
num_prefix_ranges);

if (FLAGS_num_multi_db > 1) {
fprintf(stderr,
"Cannot run RandomTransaction benchmark with "
Expand All @@ -3783,134 +3788,34 @@ class Benchmark {
}

while (!duration.Done(1)) {
Transaction* txn = nullptr;
WriteBatch* batch = nullptr;
bool success;

// RandomTransactionInserter will attempt to insert a key for each
// # of FLAGS_transaction_sets
if (FLAGS_optimistic_transaction_db) {
txn = db_.opt_txn_db->BeginTransaction(write_options_);
assert(txn);
success = inserter.OptimisticTransactionDBInsert(db_.opt_txn_db);
} else if (FLAGS_transaction_db) {
TransactionDB* txn_db = reinterpret_cast<TransactionDB*>(db_.db);

TransactionOptions txn_options;
txn_options.lock_timeout = FLAGS_transaction_lock_timeout;

txn = txn_db->BeginTransaction(write_options_, txn_options);
assert(txn);
} else {
batch = new WriteBatch();
}

if (txn && FLAGS_transaction_set_snapshot) {
txn->SetSnapshot();
}

// pick a random number to use to increment a key in each set
uint64_t incr = (thread->rand.Next() % 100) + 1;

bool failed = false;
// For each set, pick a key at random and increment it
for (uint8_t i = 0; i < num_prefix_ranges; i++) {
uint64_t int_value;
char prefix_buf[5];

// key format: [SET#][random#]
std::string rand_key = ToString(thread->rand.Next() % FLAGS_num);
Slice base_key(rand_key);

// Pad prefix appropriately so we can iterate over each set
snprintf(prefix_buf, sizeof(prefix_buf), "%04d", i + 1);
std::string full_key = std::string(prefix_buf) + base_key.ToString();
Slice key(full_key);

if (txn) {
s = txn->GetForUpdate(read_options, key, &value);
} else {
s = db->Get(read_options, key, &value);
}

if (s.ok()) {
int_value = std::stoull(value);

if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Get returned unexpected value: %s\n",
value.c_str());
abort();
}
} else if (s.IsNotFound()) {
int_value = 0;
} else if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
fprintf(stderr, "Get returned an unexpected error: %s\n",
s.ToString().c_str());
abort();
} else {
failed = true;
break;
}

if (FLAGS_transaction_sleep > 0) {
FLAGS_env->SleepForMicroseconds(thread->rand.Next() %
FLAGS_transaction_sleep);
}

std::string sum = ToString(int_value + incr);
if (txn) {
s = txn->Put(key, sum);
if (!s.ok()) {
// Since we did a GetForUpdate, Put should not fail.
fprintf(stderr, "Put returned an unexpected error: %s\n",
s.ToString().c_str());
abort();
}
} else {
batch->Put(key, sum);
}
}

if (txn) {
if (failed) {
transactions_aborted++;
txn->Rollback();
s = Status::OK();
} else {
s = txn->Commit();
}
success = inserter.TransactionDBInsert(txn_db, txn_options);
} else {
s = db->Write(write_options_, batch);
}

if (!s.ok()) {
failed = true;

// Ideally, we'd want to run this stress test with enough concurrency
// on a small enough set of keys that we get some failed transactions
// due to conflicts.
if (FLAGS_optimistic_transaction_db &&
(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) {
transactions_aborted++;
} else if (FLAGS_transaction_db && s.IsExpired()) {
transactions_aborted++;
} else {
fprintf(stderr, "Unexpected write error: %s\n", s.ToString().c_str());
abort();
}
success = inserter.DBInsert(db_.db);
}

delete txn;
delete batch;

if (!failed) {
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
if (!success) {
fprintf(stderr, "Unexpected error: %s\n",
inserter.GetLastStatus().ToString().c_str());
abort();
}

thread->stats.FinishedOps(nullptr, db_.db, 1, kOthers);
transactions_done++;
}

char msg[100];
if (FLAGS_optimistic_transaction_db || FLAGS_transaction_db) {
snprintf(msg, sizeof(msg),
"( transactions:%" PRIu64 " aborts:%" PRIu64 ")",
transactions_done, transactions_aborted);
transactions_done, inserter.GetFailureCount());
} else {
snprintf(msg, sizeof(msg), "( batches:%" PRIu64 " )", transactions_done);
}
Expand All @@ -3930,50 +3835,14 @@ class Benchmark {
return;
}

uint64_t prev_total = 0;

// For each set of keys with the same prefix, sum all the values
for (uint32_t i = 0; i < FLAGS_transaction_sets; i++) {
char prefix_buf[5];
snprintf(prefix_buf, sizeof(prefix_buf), "%04u", i + 1);
uint64_t total = 0;

Iterator* iter = db_.db->NewIterator(ReadOptions());

for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) {
Slice key = iter->key();

// stop when we reach a different prefix
if (key.ToString().compare(0, 4, prefix_buf) != 0) {
break;
}

Slice value = iter->value();
uint64_t int_value = std::stoull(value.ToString());
if (int_value == 0 || int_value == ULONG_MAX) {
fprintf(stderr, "Iter returned unexpected value: %s\n",
value.ToString().c_str());
abort();
}
Status s =
RandomTransactionInserter::Verify(db_.db, FLAGS_transaction_sets);

total += int_value;
}
delete iter;

if (i > 0) {
if (total != prev_total) {
fprintf(stderr,
"RandomTransactionVerify found inconsistent totals. "
"Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64
" \n",
i - 1, prev_total, i, total);
abort();
}
}
prev_total = total;
if (s.ok()) {
fprintf(stdout, "RandomTransactionVerify Success.\n");
} else {
fprintf(stdout, "RandomTransactionVerify FAILED!!\n");
}

fprintf(stdout, "RandomTransactionVerify Success!\n");
}
#endif // ROCKSDB_LITE

Expand Down
Loading

0 comments on commit 7902528

Please sign in to comment.