Skip to content

Commit

Permalink
libunwind and more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
MadSchemas committed Feb 19, 2024
1 parent d71b303 commit 01a92b8
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 38 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
if [[ $OS == ubuntu-latest ]]; then
cmake -DENABLE_GRPC=ON -DWITH_${{matrix.sanitizer}}=On ..
elif [[ $OS == macos* ]]; then
brew install libunwind
cmake -DGH_CI_OSX=ON ..
else
cmake -DENABLE_GRPC=ON ..
Expand Down
73 changes: 56 additions & 17 deletions cpp_src/cluster/replication/sharedsyncstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ class SharedSyncState {
using ContainerT = fast_hash_set<std::string, nocase_hash_str, nocase_equal_str, nocase_less_str>;

void MarkSynchronized(std::string name) {
const std::string n = std::move(name);
std::unique_lock<MtxT> lck(mtx_);
if (current_.role == RaftInfo::Role::Leader) {
auto res = synchronized_.emplace(std::move(name));
auto res = synchronized_.emplace(std::move(n));
lck.unlock();
if (res.second) {
std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", name);
std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", n);
cond_.notify_all();
} else {
std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", name);
std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", n);
}
} else {
std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", name,
std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", n,
RaftInfo::RoleToStr(current_.role));
}
}
Expand Down Expand Up @@ -75,9 +76,24 @@ class SharedSyncState {
if (next_.role == RaftInfo::Role::Follower) {
throw Error(errWrongReplicationData, "Node role was changed to follower");
}
std::cout << fmt::sprintf("Initial sync is not done for '%s', awaiting...\n", name);
cond_.wait(lck, ctx);
std::cout << fmt::sprintf("Initial sync is done for '%s'!\n", name);
std::cout << fmt::sprintf("Initial sync is not done for '%s', TID: %s, hash: %d; Awaiting...\n", name,
std::this_thread::get_id(), hash);
try {
cond_.wait(
lck,
[this, &name, hash]() noexcept {
auto res = isInitialSyncDone(name, hash) || terminated_ || next_.role == RaftInfo::Role::Follower;
nocase_hash_str h;
std::cout << fmt::sprintf("AwaitInitialSync(%s / %d / %d) lambda call: %d, terminated_: %d, next_.role: %d\n", name,
hash, h(name), int(res), int(terminated_), int(next_.role));
return res;
},
ctx);
} catch (...) {
std::cout << fmt::sprintf("!!!Exception in AwaitInitialSync(%s)\n", name);
throw;
}
std::cout << fmt::sprintf("Initial sync is done for '%s', TID: %s, hash: %d!\n", name, std::this_thread::get_id(), hash);
}
}
template <typename ContextT>
Expand All @@ -91,7 +107,20 @@ class SharedSyncState {
throw Error(errWrongReplicationData, "Node role was changed to follower");
}
std::cout << fmt::sprintf("Initial sync is not done for 'whole DB', awaiting...\n");
cond_.wait(lck, ctx);
try {
cond_.wait(
lck,
[this]() noexcept {
auto res = isInitialSyncDone() || terminated_ || next_.role == RaftInfo::Role::Follower;
std::cout << fmt::sprintf("AwaitInitialSync() lambda call: %d, terminated_: %d, next_.role: %d\n", int(res),
int(terminated_), int(next_.role));
return res;
},
ctx);
} catch (...) {
std::cout << "!!!Exception in AwaitInitialSync()\n";
throw;
}
std::cout << fmt::sprintf("Initial sync is done for 'whole DB'!\n");
}
}
Expand Down Expand Up @@ -125,17 +154,27 @@ class SharedSyncState {
RaftInfo AwaitRole(bool allowTransitState, const ContextT& ctx) const {
shared_lock<MtxT> lck(mtx_);
if (allowTransitState) {
cond_.wait(
lck, [this] { return !isRunning() || next_ == current_; }, ctx);
try {
cond_.wait(
lck, [this] { return !isRunning() || next_ == current_; }, ctx);
} catch (...) {
std::cout << fmt::sprintf("!!!Exception in AwaitRole(allowTransitState=true)\n");
throw;
}
} else {
std::cout << fmt::sprintf("Awaiting role transition... Current role is %s\n", RaftInfo::RoleToStr(current_.role));
cond_.wait(
lck,
[this] {
return !isRunning() ||
(next_ == current_ && (current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower));
},
ctx);
try {
cond_.wait(
lck,
[this] {
return !isRunning() || (next_ == current_ &&
(current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower));
},
ctx);
} catch (...) {
std::cout << fmt::sprintf("!!!Exception in AwaitRole(allowTransitState=false)\n");
throw;
}
std::cout << fmt::sprintf("Role transition done! Current role is %s\n", RaftInfo::RoleToStr(current_.role));
}
return current_;
Expand Down
12 changes: 9 additions & 3 deletions cpp_src/cluster/replication/updatesqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,15 @@ class UpdatesQueue {
lck.lock();
}
static RdxContext dummyCtx_;
condResultReady_.wait(
lck, [&localData] { return localData.executedCnt == localData.dataSize; },
dummyCtx_); // Don't pass cancel context here, because data are already on the leader and we have to handle them
try {
condResultReady_.wait(
lck, [&localData] { return localData.executedCnt == localData.dataSize; },
dummyCtx_); // Don't pass cancel context here, because data are already on the leader and we have to handle them
} catch (...) {
std::cout << "!!!Exception in PushAndWait\n";
throw;
}

return std::make_pair(std::move(localData.err), true);
} catch (...) {
logInfoW([] { return "PushAndWait call has recieved an exception"; });
Expand Down
13 changes: 11 additions & 2 deletions cpp_src/core/namespace/namespaceimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,23 +1480,32 @@ void NamespaceImpl::doTruncate(UpdatesContainer& pendedRepl, const NsContext& ct
void NamespaceImpl::ModifyItem(Item& item, ItemModifyMode mode, const RdxContext& ctx) {
auto name = GetName(ctx);
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name);
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' begin\n", wal_.GetServer(), name);
}
PerfStatCalculatorMT calc(updatePerfCounter_, enablePerfCounters_);
UpdatesContainer pendedRepl;

CounterGuardAIR32 cg(cancelCommitCnt_);
auto wlck = dataWLock(ctx);

if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' locked\n", wal_.GetServer(), name);
}
cg.Reset();
calc.LockHit();
if (mode == ModeDelete && rx_unlikely(item.PkFields() != pkFields())) {
throw Error(errNotValid, "Item has outdated PK metadata (probably PK has been change during the Delete-call)");
}
modifyItem(item, mode, pendedRepl, NsContext(ctx));

if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' replicate call with %d recs\n", wal_.GetServer(), name,
pendedRepl.size());
}

replicate(std::move(pendedRepl), std::move(wlck), true, nullptr, ctx);
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name);
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' end\n", wal_.GetServer(), name);
}
}

Expand Down
18 changes: 15 additions & 3 deletions cpp_src/core/namespace/namespaceimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,10 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.

NsWLock() = default;
NsWLock(MutexType &mtx, const RdxContext &ctx, bool isCL) : impl_(mtx, ctx), isClusterLck_(isCL) {}
NsWLock(const NsWLock &) = delete;
NsWLock(NsWLock &&) = default;
NsWLock &operator=(const NsWLock &) = delete;
NsWLock &operator=(NsWLock &&) = default;
void lock() { impl_.lock(); }
void unlock() { impl_.unlock(); }
bool owns_lock() const { return impl_.owns_lock(); }
Expand Down Expand Up @@ -273,15 +277,15 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
lck.lock();
checkInvalidation();
synchronized = clusterizator_.IsInitialSyncDone(owner_.name_);
std::cout << fmt::sprintf("'%s' is %s syncronized!\n", owner_.name_, synchronized ? "" : "not");
std::cout << fmt::sprintf("%d:'%s' is %s syncronized!\n", owner_.wal_.GetServer(), owner_.name_, synchronized ? "" : "not");
}

if (!skipClusterStatusCheck) {
owner_.checkClusterStatus(ctx); // throw exception if false
}

if (awaitingSync) {
std::cout << fmt::sprintf("'%s' got lock after sync\n", owner_.name_);
std::cout << fmt::sprintf("%d:'%s' got lock after sync\n", owner_.wal_.GetServer(), owner_.name_);
}

return lck;
Expand Down Expand Up @@ -590,8 +594,12 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
QueryStatsCalculatorT &&statCalculator, const NsContext &ctx) {
if (!repl_.temporary) {
assertrx(!ctx.isCopiedNsRequest);
const auto recsSize = recs.size();
const auto name = name_;
const auto invState = int(locker_.InvalidationType().load());
const auto sid = wal_.GetServer();
if (!isSystem()) {
std::cout << fmt::sprintf("Namespace::'%s' replicating %d records\n", name_, recs.size());
std::cout << fmt::sprintf("Namespace::%d:'%s' replicating %d records. Inv state: %d\n", sid, name, recsSize, invState);
}
auto err = clusterizator_.Replicate(
std::move(recs),
Expand All @@ -600,6 +608,10 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
wlck.unlock();
},
ctx.rdxContext);
if (!isSystem()) {
std::cout << fmt::sprintf("Namespace::%d:'%s' replication done for %d records. Inv state: %d\n", sid, name, recsSize,
invState);
}
if constexpr (std::is_same_v<QueryStatsCalculatorT, std::nullptr_t>) {
storage_.TryForceFlush();
} else {
Expand Down
26 changes: 13 additions & 13 deletions cpp_src/estl/contexted_cond_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ class contexted_cond_var {
}
}

template <typename _Lock, typename _ContextT>
void wait(_Lock& __lock, const _ContextT& __context) {
using namespace std::string_view_literals;
assert(_M_cond_var);
// const auto lockWard = _M_context->BeforeLock(_Mutex::mark);
if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) {
while (_M_cond_var->wait_for(__lock, _M_chk_timeout) == std::cv_status::timeout) {
AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
}
} else {
_M_cond_var->wait(__lock);
}
}
// template <typename _Lock, typename _ContextT>
// void wait(_Lock& __lock, const _ContextT& __context) {
// using namespace std::string_view_literals;
// assert(_M_cond_var);
// // const auto lockWard = _M_context->BeforeLock(_Mutex::mark);
// if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) {
// while (_M_cond_var->wait_for(__lock, _M_chk_timeout) == std::cv_status::timeout) {
// AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
// }
// } else {
// _M_cond_var->wait(__lock);
// }
// }

void notify_all() {
assert(_M_cond_var);
Expand Down

0 comments on commit 01a92b8

Please sign in to comment.