Skip to content

Commit

Permalink
Fix super version change, it's hack
Browse files Browse the repository at this point in the history
Signed-off-by: subains <[email protected]>
  • Loading branch information
subains committed Dec 9, 2022
1 parent 95e9ebd commit e3741db
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 18 deletions.
31 changes: 28 additions & 3 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,18 @@ SuperVersion::~SuperVersion() {

SuperVersion* SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
std::cout << "SV: " << this << " ref: " << refs << "\n";
return this;
}

uint32_t SuperVersion::GetRef() const {
return refs.load();
}

bool SuperVersion::Unref() {
// fetch_sub returns the previous value of ref
std::cout << "SV: " << this << " unref: " << refs << "\n";

uint32_t previous_refs = refs.fetch_sub(1);
assert(previous_refs > 0);
return previous_refs == 1;
Expand Down Expand Up @@ -672,6 +679,8 @@ ColumnFamilyData::~ColumnFamilyData() {
}

bool ColumnFamilyData::UnrefAndTryDelete() {
std::cout << "CFD: " << this << ", refs_.sub: " << refs_ << "\n";

int old_refs = refs_.fetch_sub(1);
assert(old_refs > 0);

Expand Down Expand Up @@ -1219,7 +1228,7 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// local pointer to guarantee exclusive access. If the thread local pointer
// is being used while a new SuperVersion is installed, the cached
// SuperVersion can become stale. In that case, the background thread would
// have swapped in kSVObsolete. We re-check the value at when returning
// have swapped in kSVObsolete. We re-check the value when returning
// SuperVersion back to thread local, with an atomic compare and swap.
// The superversion will need to be released if detected to be stale.
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
Expand All @@ -1228,8 +1237,8 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
// (if no Scrape happens).
// assert(ptr != SuperVersion::kSVInUse);
if (ptr == SuperVersion::kSVInUse) {
// FIXME: Check version number too.
return super_version_->Ref();
}

Expand Down Expand Up @@ -1260,23 +1269,38 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
delete sv_to_delete;
}
assert(sv != nullptr);
std::cout << "SV: " << sv << std::endl;
return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
assert(sv != nullptr);

// Put the SuperVersion back
void* expected = SuperVersion::kSVInUse;

if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
// storage has not been altered and no Scrape has happened. The
// SuperVersion is still current.
std::cout << "SV: " << sv << " in use\n";
return true;
} else if (expected != nullptr) {
auto ptr = static_cast<SuperVersion*>(local_sv_->Swap(sv));
if (sv != ptr) {
ptr->Unref();
refs_.fetch_sub(1);
ptr->Cleanup();
delete ptr;
} else {
sv->Unref();
}
return true;
} else {
// ThreadLocal scrape happened in the process of this GetImpl call (after
// thread local Swap() at the beginning and before CompareAndSwap()).
// This means the SuperVersion it holds is obsolete.
// assert(expected == SuperVersion::kSVObsolete);
std::cout << "SV: " << sv << " (" << expected << ") obsolete : " << "\n";
}
return false;
}
Expand Down Expand Up @@ -1333,6 +1357,7 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);

bool was_last_ref __attribute__((__unused__));
was_last_ref = sv->Unref();
// sv couldn't have been the last reference because
Expand Down
8 changes: 7 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ struct SuperVersion {
// should be called outside the mutex
SuperVersion() = default;
~SuperVersion();

uint32_t GetRef() const;

SuperVersion* Ref();
// If Unref() returns true, Cleanup() should be called with mutex held
// before deleting this SuperVersion.
Expand Down Expand Up @@ -274,7 +277,10 @@ class ColumnFamilyData {
// Ref() can only be called from a context where the caller can guarantee
// that ColumnFamilyData is alive (while holding a non-zero ref already,
// holding a DB mutex, or as the leader in a write batch group).
void Ref() { refs_.fetch_add(1); }
void Ref() {
std::cout << "CFD: " << this << ", refs_.add: " << refs_ << "\n";
refs_.fetch_add(1);
}

// UnrefAndTryDelete() decreases the reference count and do free if needed,
// return true if this is freed else false, UnrefAndTryDelete() can only
Expand Down
15 changes: 4 additions & 11 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4045,17 +4045,9 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
return ret;
}

SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd,
bool useThreadLocalCache) {
if (useThreadLocalCache) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly
return cfd->GetThreadLocalSuperVersion(this);
} else {
this->mutex()->Lock();
auto sv = cfd->GetSuperVersion();
this->mutex()->Unlock();
return sv;
}
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
// TODO(ljin): consider using GetReferencedSuperVersion() directly
return cfd->GetThreadLocalSuperVersion(this);
}

// REQUIRED: this function should only be called on the write thread or if the
Expand All @@ -4072,6 +4064,7 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {

void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
// Release SuperVersion
std::cout << "SV: Cleanup: " << sv << "\n";
if (sv->Unref()) {
bool defer_purge =
immutable_db_options().avoid_unnecessary_blocking_io;
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,7 @@ class DBImpl : public DB {
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.
// Call ReturnAndCleanupSuperVersion() when it is no longer needed.
SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd,
bool useThreadLocalCache = true);
SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);

// Similar to the previous function but looks up based on a column family id.
// nullptr will be returned if this column family no longer exists.
Expand Down
2 changes: 1 addition & 1 deletion utilities/transactions/transaction_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Status TransactionUtil::CheckKeyForConflicts(

auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
auto cfd = cfh->cfd();
SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd, true);
SuperVersion* sv = db_impl->GetAndRefSuperVersion(cfd);

if (sv == nullptr) {
result = Status::InvalidArgument("Could not access column family " +
Expand Down

0 comments on commit e3741db

Please sign in to comment.