Skip to content

Commit

Permalink
Support WriteCommit policy with sync_fault_injection=1 (facebook#10624)
Browse files Browse the repository at this point in the history
Summary:
**Context:**
Prior to this PR, correctness testing with un-sync data loss [disabled](facebook#10605) transaction (`use_txn=1`) thus all of the `txn_write_policy` . This PR improved that by adding support for one policy - WriteCommit (`txn_write_policy=0`).

**Summary:**
They key to this support is (a) handle Mark{Begin, End}Prepare/MarkCommit/MarkRollback in constructing ExpectedState under WriteCommit policy correctly and (b) monitor CI jobs and solve any test incompatibility issue till jobs are stable. (b) will be part of the test plan.

For (a)
- During prepare (i.e, between `MarkBeginPrepare()` and `MarkEndPrepare(xid)`), `ExpectedStateTraceRecordHandler` will buffer all writes by adding all writes to an internal `WriteBatch`.
- On `MarkEndPrepare()`, that `WriteBatch` will be associated with the transaction's `xid`.
- During the commit (i.e, on `MarkCommit(xid)`), `ExpectedStateTraceRecordHandler` will retrieve and iterate the internal `WriteBatch` and finally apply those writes to `ExpectedState`
- During the rollback (i.e, on `MarkRollback(xid)`), `ExpectedStateTraceRecordHandler` will erase the internal `WriteBatch` from the map.

For (b) - one major issue described below:
- TransactionsDB in db stress recovers prepared-but-not-committed txns from the previous crashed run by randomly committing or rolling back it at the start of the current run, see a historical [PR](facebook@6d06be2) predated correctness testing.
- And we will verify those processed keys in a recovered db against their expected state.
- However since now we turn on `sync_fault_injection=1` where the expected state is constructed from the trace instead of using the LATEST.state from previous run. The expected state now used to verify those processed keys won't contain UNKNOWN_SENTINEL as they should - see test 1 for a failed case.
- Therefore, we decided to manually update its expected state to be UNKNOWN_SENTINEL as part of the processing.

Pull Request resolved: facebook#10624

Test Plan:
1. Test exposed the major issue described above. This test will fail without setting UNKNOWN_SENTINEL in expected state during the processing and pass after
```
db=/dev/shm/rocksdb_crashtest_blackbox
exp=/dev/shm/rocksdb_crashtest_expected
dbt=$db.tmp
expt=$exp.tmp

rm -rf $db $exp
mkdir -p $exp

echo "RUN 1"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 0.2
sleep 20
kill $pid
sleep 0.2

echo "RUN 2"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 0.2
sleep 20
kill $pid
sleep 0.2

echo "RUN 3"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1
```

2. Manual testing to ensure ExpectedState is constructed correctly during recovery by verifying it against previously crashed TransactionDB's WAL.
   - Run the following command to crash a TransactionDB with WriteCommit policy. Then `./ldb dump_wal` on its WAL file
```
db=/dev/shm/rocksdb_crashtest_blackbox
exp=/dev/shm/rocksdb_crashtest_expected
rm -rf $db $exp
mkdir -p $exp

./db_stress \
	--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
	--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 30
kill $pid
sleep 1
```
- Run the following command to verify recovery of the crashed db under debugger. Compare the step-wise result with WAL records (e.g, WriteBatch content, xid, prepare/commit/rollback marker)
```
   ./db_stress \
	--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
	--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1
```
3. Automatic testing by triggering all RocksDB stress/crash test jobs for 3 rounds with no failure.

Reviewed By: ajkr, riversand963

Differential Revision: D39199373

Pulled By: hx235

fbshipit-source-id: 7a1dec0e3e2ee6ea86ddf5dd19ceb5543a3d6f0c
  • Loading branch information
hx235 authored and facebook-github-bot committed Sep 27, 2022
1 parent 5d7cf31 commit aed30dd
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 28 deletions.
65 changes: 46 additions & 19 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "test_util/testutil.h"
#include "util/cast_util.h"
#include "utilities/backup/backup_engine_impl.h"
Expand Down Expand Up @@ -309,7 +310,15 @@ void StressTest::FinishInitDb(SharedState* shared) {
exit(1);
}
}

#ifndef ROCKSDB_LITE
if (FLAGS_use_txn) {
// It's OK here without sync because unsynced data cannot be lost at this
// point
// - even with sync_fault_injection=1 as the
// file is still directly writable until after FinishInitDb()
ProcessRecoveredPreparedTxns(shared);
}
#endif
if (FLAGS_enable_compaction_filter) {
auto* compaction_filter_factory =
reinterpret_cast<DbStressCompactionFilterFactory*>(
Expand Down Expand Up @@ -555,6 +564,42 @@ Status StressTest::SetOptions(ThreadState* thread) {
}

#ifndef ROCKSDB_LITE
void StressTest::ProcessRecoveredPreparedTxns(SharedState* shared) {
assert(txn_db_);
std::vector<Transaction*> recovered_prepared_trans;
txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
for (Transaction* txn : recovered_prepared_trans) {
ProcessRecoveredPreparedTxnsHelper(txn, shared);
delete txn;
}
recovered_prepared_trans.clear();
txn_db_->GetAllPreparedTransactions(&recovered_prepared_trans);
assert(recovered_prepared_trans.size() == 0);
}

void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
SharedState* shared) {
thread_local Random rand(static_cast<uint32_t>(FLAGS_seed));
for (size_t i = 0; i < column_families_.size(); ++i) {
std::unique_ptr<WBWIIterator> wbwi_iter(
txn->GetWriteBatch()->NewIterator(column_families_[i]));
for (wbwi_iter->SeekToFirst(); wbwi_iter->Valid(); wbwi_iter->Next()) {
uint64_t key_val;
if (GetIntVal(wbwi_iter->Entry().key.ToString(), &key_val)) {
shared->Put(static_cast<int>(i) /* cf_idx */, key_val,
0 /* value_base */, true /* pending */);
}
}
}
if (rand.OneIn(2)) {
Status s = txn->Commit();
assert(s.ok());
} else {
Status s = txn->Rollback();
assert(s.ok());
}
}

Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) {
if (!FLAGS_use_txn) {
return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set");
Expand Down Expand Up @@ -2648,24 +2693,6 @@ void StressTest::Open(SharedState* shared) {
db_ = txn_db_;
db_aptr_.store(txn_db_, std::memory_order_release);
}

// after a crash, rollback to commit recovered transactions
std::vector<Transaction*> trans;
txn_db_->GetAllPreparedTransactions(&trans);
Random rand(static_cast<uint32_t>(FLAGS_seed));
for (auto txn : trans) {
if (rand.OneIn(2)) {
s = txn->Commit();
assert(s.ok());
} else {
s = txn->Rollback();
assert(s.ok());
}
delete txn;
}
trans.clear();
txn_db_->GetAllPreparedTransactions(&trans);
assert(trans.size() == 0);
#endif
}
if (!s.ok()) {
Expand Down
15 changes: 12 additions & 3 deletions db_stress_tool/db_stress_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@ class StressTest {
// The initialization work is split into two parts to avoid a circular
// dependency with `SharedState`.
virtual void FinishInitDb(SharedState*);

void TrackExpectedState(SharedState* shared);

void OperateDb(ThreadState* thread);
virtual void VerifyDb(ThreadState* thread) const = 0;
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;

void PrintStatistics();

protected:
Expand All @@ -54,6 +51,18 @@ class StressTest {
Status SetOptions(ThreadState* thread);

#ifndef ROCKSDB_LITE
// For transactionsDB, there can be txns prepared but not yet committeed
// right before previous stress run crash.
// They will be recovered and processed through
// ProcessRecoveredPreparedTxnsHelper on the start of current stress run.
void ProcessRecoveredPreparedTxns(SharedState* shared);

// Default implementation will first update ExpectedState to be
// `SharedState::UNKNOWN` for each keys in `txn` and then randomly
// commit or rollback `txn`.
virtual void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
SharedState* shared);

Status NewTxn(WriteOptions& write_opts, Transaction** txn);

Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr);
Expand Down
8 changes: 8 additions & 0 deletions db_stress_tool/db_stress_tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ int db_stress_tool(int argc, char** argv) {
exit(1);
}

if (FLAGS_use_txn && FLAGS_sync_fault_injection &&
FLAGS_txn_write_policy != 0) {
fprintf(stderr,
"For TransactionDB, correctness testing with unsync data loss is "
"currently compatible with only write committed policy\n");
exit(1);
}

#ifndef NDEBUG
KillPoint* kp = KillPoint::GetInstance();
kp->rocksdb_kill_odds = FLAGS_kill_random_test;
Expand Down
85 changes: 84 additions & 1 deletion db_stress_tool/expected_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
public WriteBatch::Handler {
public:
ExpectedStateTraceRecordHandler(uint64_t max_write_ops, ExpectedState* state)
: max_write_ops_(max_write_ops), state_(state) {}
: max_write_ops_(max_write_ops),
state_(state),
buffered_writes_(nullptr) {}

~ExpectedStateTraceRecordHandler() { assert(IsDone()); }

Expand Down Expand Up @@ -391,6 +393,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
}
uint32_t value_id = GetValueBase(value);

bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
return WriteBatchInternal::Put(buffered_writes_.get(), column_family_id,
key, value);
}

state_->Put(column_family_id, static_cast<int64_t>(key_id), value_id,
false /* pending */);
++num_write_ops_;
Expand All @@ -406,6 +414,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
return Status::Corruption("unable to parse key", key.ToString());
}

bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
return WriteBatchInternal::Delete(buffered_writes_.get(),
column_family_id, key);
}

state_->Delete(column_family_id, static_cast<int64_t>(key_id),
false /* pending */);
++num_write_ops_;
Expand All @@ -414,6 +428,18 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,

Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key_with_ts) override {
bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
Slice key =
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
Slice ts =
ExtractTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
std::array<Slice, 2> key_with_ts_arr{{key, ts}};
return WriteBatchInternal::SingleDelete(
buffered_writes_.get(), column_family_id,
SliceParts(key_with_ts_arr.data(), 2));
}

return DeleteCF(column_family_id, key_with_ts);
}

Expand All @@ -433,6 +459,12 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
return Status::Corruption("unable to parse end key", end_key.ToString());
}

bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
return WriteBatchInternal::DeleteRange(
buffered_writes_.get(), column_family_id, begin_key, end_key);
}

state_->DeleteRange(column_family_id, static_cast<int64_t>(begin_key_id),
static_cast<int64_t>(end_key_id), false /* pending */);
++num_write_ops_;
Expand All @@ -443,13 +475,64 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
const Slice& value) override {
Slice key =
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);

bool should_buffer_write = !(buffered_writes_ == nullptr);
if (should_buffer_write) {
return WriteBatchInternal::Merge(buffered_writes_.get(), column_family_id,
key, value);
}

return PutCF(column_family_id, key, value);
}

Status MarkBeginPrepare(bool = false) override {
assert(!buffered_writes_);
buffered_writes_.reset(new WriteBatch());
return Status::OK();
}

Status MarkEndPrepare(const Slice& xid) override {
assert(buffered_writes_);
std::string xid_str = xid.ToString();
assert(xid_to_buffered_writes_.find(xid_str) ==
xid_to_buffered_writes_.end());

xid_to_buffered_writes_[xid_str].swap(buffered_writes_);

buffered_writes_.reset();

return Status::OK();
}

Status MarkCommit(const Slice& xid) override {
std::string xid_str = xid.ToString();
assert(xid_to_buffered_writes_.find(xid_str) !=
xid_to_buffered_writes_.end());
assert(xid_to_buffered_writes_.at(xid_str));

Status s = xid_to_buffered_writes_.at(xid_str)->Iterate(this);
xid_to_buffered_writes_.erase(xid_str);

return s;
}

Status MarkRollback(const Slice& xid) override {
std::string xid_str = xid.ToString();
assert(xid_to_buffered_writes_.find(xid_str) !=
xid_to_buffered_writes_.end());
assert(xid_to_buffered_writes_.at(xid_str));
xid_to_buffered_writes_.erase(xid_str);

return Status::OK();
}

private:
uint64_t num_write_ops_ = 0;
uint64_t max_write_ops_;
ExpectedState* state_;
std::unordered_map<std::string, std::unique_ptr<WriteBatch>>
xid_to_buffered_writes_;
std::unique_ptr<WriteBatch> buffered_writes_;
};

} // anonymous namespace
Expand Down
22 changes: 22 additions & 0 deletions db_stress_tool/multi_ops_txns_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) {
if (FLAGS_enable_compaction_filter) {
// TODO (yanqin) enable compaction filter
}
#ifndef ROCKSDB_LITE
ProcessRecoveredPreparedTxns(shared);
#endif

ReopenAndPreloadDbIfNeeded(shared);
// TODO (yanqin) parallelize if key space is large
for (auto& key_gen : key_gen_for_a_) {
Expand Down Expand Up @@ -1353,6 +1357,18 @@ uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) {
}

#ifndef ROCKSDB_LITE
void MultiOpsTxnsStressTest::ProcessRecoveredPreparedTxnsHelper(
Transaction* txn, SharedState*) {
thread_local Random rand(static_cast<uint32_t>(FLAGS_seed));
if (rand.OneIn(2)) {
Status s = txn->Commit();
assert(s.ok());
} else {
Status s = txn->Rollback();
assert(s.ok());
}
}

Status MultiOpsTxnsStressTest::WriteToCommitTimeWriteBatch(Transaction& txn) {
WriteBatch* ctwb = txn.GetCommitTimeWriteBatch();
assert(ctwb);
Expand Down Expand Up @@ -1776,6 +1792,12 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() {
exit(1);
}
}
if (FLAGS_sync_fault_injection == 1) {
fprintf(stderr,
"Sync fault injection is currently not supported in "
"-test_multi_ops_txns\n");
exit(1);
}
#else
fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n");
exit(1);
Expand Down
4 changes: 4 additions & 0 deletions db_stress_tool/multi_ops_txns_stress.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ class MultiOpsTxnsStressTest : public StressTest {
uint32_t GenerateNextC(ThreadState* thread);

#ifndef ROCKSDB_LITE
// Randomly commit or rollback `txn`
void ProcessRecoveredPreparedTxnsHelper(Transaction* txn,
SharedState*) override;

// Some applications, e.g. MyRocks writes a KV pair to the database via
// commit-time-write-batch (ctwb) in additional to the transaction's regular
// write batch. The key is usually constant representing some system
Expand Down
10 changes: 5 additions & 5 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def is_direct_io_supported(dbname):
# Re-enable once we have a compaction for MultiOpsTxnStressTest
"enable_compaction_filter": 0,
"create_timestamped_snapshot_one_in": 50,
"sync_fault_injection": 0,
}

multiops_wc_txn_params = {
Expand Down Expand Up @@ -513,10 +514,6 @@ def finalize_and_sanitize(src_params):
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
dest_params["ingest_external_file_one_in"] = 0
# Correctness testing with unsync data loss is not currently compatible
# with transactions
if dest_params.get("use_txn") == 1:
dest_params["sync_fault_injection"] = 0
if (
dest_params.get("disable_wal") == 1
or dest_params.get("sync_fault_injection") == 1
Expand Down Expand Up @@ -594,7 +591,10 @@ def finalize_and_sanitize(src_params):
if dest_params.get("create_timestamped_snapshot_one_in", 0) > 0:
dest_params["txn_write_policy"] = 0
dest_params["unordered_write"] = 0

# For TransactionDB, correctness testing with unsync data loss is currently
# compatible with only write committed policy
if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0):
dest_params["sync_fault_injection"] = 0
return dest_params


Expand Down

0 comments on commit aed30dd

Please sign in to comment.