diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 66e2add90..8c9ffe33c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,6 +33,7 @@ jobs: if [[ $OS == ubuntu-latest ]]; then cmake -DENABLE_GRPC=ON -DWITH_${{matrix.sanitizer}}=On .. elif [[ $OS == macos* ]]; then + brew install libunwind cmake -DGH_CI_OSX=ON .. else cmake -DENABLE_GRPC=ON .. diff --git a/cpp_src/cluster/replication/sharedsyncstate.h b/cpp_src/cluster/replication/sharedsyncstate.h index 4c3b13a1b..2e7638828 100644 --- a/cpp_src/cluster/replication/sharedsyncstate.h +++ b/cpp_src/cluster/replication/sharedsyncstate.h @@ -22,18 +22,19 @@ class SharedSyncState { using ContainerT = fast_hash_set; void MarkSynchronized(std::string name) { + const std::string n = std::move(name); std::unique_lock lck(mtx_); if (current_.role == RaftInfo::Role::Leader) { - auto res = synchronized_.emplace(std::move(name)); + auto res = synchronized_.emplace(std::move(n)); lck.unlock(); if (res.second) { - std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", name); + std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", n); cond_.notify_all(); } else { - std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", name); + std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", n); } } else { - std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", name, + std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", n, RaftInfo::RoleToStr(current_.role)); } } @@ -75,9 +76,24 @@ class SharedSyncState { if (next_.role == RaftInfo::Role::Follower) { throw Error(errWrongReplicationData, "Node role was changed to follower"); } - std::cout << fmt::sprintf("Initial sync is not done for '%s', awaiting...\n", name); - cond_.wait(lck, ctx); - std::cout << fmt::sprintf("Initial sync is done for '%s'!\n", name); + std::cout << fmt::sprintf("Initial sync is not done for '%s', TID: %s, hash: %d; Awaiting...\n", name, + std::this_thread::get_id(), hash); + try { + cond_.wait( + lck, + [this, &name, hash]() noexcept { + auto res = isInitialSyncDone(name, hash) || terminated_ || next_.role == RaftInfo::Role::Follower; + nocase_hash_str h; + std::cout << fmt::sprintf("AwaitInitialSync(%s / %d / %d) lambda call: %d, terminated_: %d, next_.role: %d\n", name, + hash, h(name), int(res), int(terminated_), int(next_.role)); + return res; + }, + ctx); + } catch (...) { + std::cout << fmt::sprintf("!!!Exception in AwaitInitialSync(%s)\n", name); + throw; + } + std::cout << fmt::sprintf("Initial sync is done for '%s', TID: %s, hash: %d!\n", name, std::this_thread::get_id(), hash); } } template @@ -91,7 +107,20 @@ class SharedSyncState { throw Error(errWrongReplicationData, "Node role was changed to follower"); } std::cout << fmt::sprintf("Initial sync is not done for 'whole DB', awaiting...\n"); - cond_.wait(lck, ctx); + try { + cond_.wait( + lck, + [this]() noexcept { + auto res = isInitialSyncDone() || terminated_ || next_.role == RaftInfo::Role::Follower; + std::cout << fmt::sprintf("AwaitInitialSync() lambda call: %d, terminated_: %d, next_.role: %d\n", int(res), + int(terminated_), int(next_.role)); + return res; + }, + ctx); + } catch (...) { + std::cout << "!!!Exception in AwaitInitialSync()\n"; + throw; + } std::cout << fmt::sprintf("Initial sync is done for 'whole DB'!\n"); } } @@ -125,17 +154,27 @@ class SharedSyncState { RaftInfo AwaitRole(bool allowTransitState, const ContextT& ctx) const { shared_lock lck(mtx_); if (allowTransitState) { - cond_.wait( - lck, [this] { return !isRunning() || next_ == current_; }, ctx); + try { + cond_.wait( + lck, [this] { return !isRunning() || next_ == current_; }, ctx); + } catch (...) { + std::cout << fmt::sprintf("!!!Exception in AwaitRole(allowTransitState=true)\n"); + throw; + } } else { std::cout << fmt::sprintf("Awaiting role transition... Current role is %s\n", RaftInfo::RoleToStr(current_.role)); - cond_.wait( - lck, - [this] { - return !isRunning() || - (next_ == current_ && (current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower)); - }, - ctx); + try { + cond_.wait( + lck, + [this] { + return !isRunning() || (next_ == current_ && + (current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower)); + }, + ctx); + } catch (...) { + std::cout << fmt::sprintf("!!!Exception in AwaitRole(allowTransitState=false)\n"); + throw; + } std::cout << fmt::sprintf("Role transition done! Current role is %s\n", RaftInfo::RoleToStr(current_.role)); } return current_; diff --git a/cpp_src/cluster/replication/updatesqueue.h b/cpp_src/cluster/replication/updatesqueue.h index 4a8c54558..ffd0cd64f 100644 --- a/cpp_src/cluster/replication/updatesqueue.h +++ b/cpp_src/cluster/replication/updatesqueue.h @@ -294,9 +294,15 @@ class UpdatesQueue { lck.lock(); } static RdxContext dummyCtx_; - condResultReady_.wait( - lck, [&localData] { return localData.executedCnt == localData.dataSize; }, - dummyCtx_); // Don't pass cancel context here, because data are already on the leader and we have to handle them + try { + condResultReady_.wait( + lck, [&localData] { return localData.executedCnt == localData.dataSize; }, + dummyCtx_); // Don't pass cancel context here, because data are already on the leader and we have to handle them + } catch (...) { + std::cout << "!!!Exception in PushAndWait\n"; + throw; + } + return std::make_pair(std::move(localData.err), true); } catch (...) { logInfoW([] { return "PushAndWait call has recieved an exception"; }); diff --git a/cpp_src/core/namespace/namespaceimpl.cc b/cpp_src/core/namespace/namespaceimpl.cc index 75de46adc..f5acacd6d 100644 --- a/cpp_src/core/namespace/namespaceimpl.cc +++ b/cpp_src/core/namespace/namespaceimpl.cc @@ -1480,13 +1480,17 @@ void NamespaceImpl::doTruncate(UpdatesContainer& pendedRepl, const NsContext& ct void NamespaceImpl::ModifyItem(Item& item, ItemModifyMode mode, const RdxContext& ctx) { auto name = GetName(ctx); if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) { - std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name); + std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' begin\n", wal_.GetServer(), name); } PerfStatCalculatorMT calc(updatePerfCounter_, enablePerfCounters_); UpdatesContainer pendedRepl; CounterGuardAIR32 cg(cancelCommitCnt_); auto wlck = dataWLock(ctx); + + if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) { + std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' locked\n", wal_.GetServer(), name); + } cg.Reset(); calc.LockHit(); if (mode == ModeDelete && rx_unlikely(item.PkFields() != pkFields())) { @@ -1494,9 +1498,14 @@ void NamespaceImpl::ModifyItem(Item& item, ItemModifyMode mode, const RdxContext } modifyItem(item, mode, pendedRepl, NsContext(ctx)); + if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) { + std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' replicate call with %d recs\n", wal_.GetServer(), name, + pendedRepl.size()); + } + replicate(std::move(pendedRepl), std::move(wlck), true, nullptr, ctx); if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) { - std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name); + std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into %d:'%s' end\n", wal_.GetServer(), name); } } diff --git a/cpp_src/core/namespace/namespaceimpl.h b/cpp_src/core/namespace/namespaceimpl.h index 5d51a3d94..74414ba52 100644 --- a/cpp_src/core/namespace/namespaceimpl.h +++ b/cpp_src/core/namespace/namespaceimpl.h @@ -241,6 +241,10 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. NsWLock() = default; NsWLock(MutexType &mtx, const RdxContext &ctx, bool isCL) : impl_(mtx, ctx), isClusterLck_(isCL) {} + NsWLock(const NsWLock &) = delete; + NsWLock(NsWLock &&) = default; + NsWLock &operator=(const NsWLock &) = delete; + NsWLock &operator=(NsWLock &&) = default; void lock() { impl_.lock(); } void unlock() { impl_.unlock(); } bool owns_lock() const { return impl_.owns_lock(); } @@ -273,7 +277,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. lck.lock(); checkInvalidation(); synchronized = clusterizator_.IsInitialSyncDone(owner_.name_); - std::cout << fmt::sprintf("'%s' is %s syncronized!\n", owner_.name_, synchronized ? "" : "not"); + std::cout << fmt::sprintf("%d:'%s' is %s syncronized!\n", owner_.wal_.GetServer(), owner_.name_, synchronized ? "" : "not"); } if (!skipClusterStatusCheck) { @@ -281,7 +285,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. } if (awaitingSync) { - std::cout << fmt::sprintf("'%s' got lock after sync\n", owner_.name_); + std::cout << fmt::sprintf("%d:'%s' got lock after sync\n", owner_.wal_.GetServer(), owner_.name_); } return lck; @@ -590,8 +594,12 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. QueryStatsCalculatorT &&statCalculator, const NsContext &ctx) { if (!repl_.temporary) { assertrx(!ctx.isCopiedNsRequest); + const auto recsSize = recs.size(); + const auto name = name_; + const auto invState = int(locker_.InvalidationType().load()); + const auto sid = wal_.GetServer(); if (!isSystem()) { - std::cout << fmt::sprintf("Namespace::'%s' replicating %d records\n", name_, recs.size()); + std::cout << fmt::sprintf("Namespace::%d:'%s' replicating %d records. Inv state: %d\n", sid, name, recsSize, invState); } auto err = clusterizator_.Replicate( std::move(recs), @@ -600,6 +608,10 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance. wlck.unlock(); }, ctx.rdxContext); + if (!isSystem()) { + std::cout << fmt::sprintf("Namespace::%d:'%s' replication done for %d records. Inv state: %d\n", sid, name, recsSize, + invState); + } if constexpr (std::is_same_v) { storage_.TryForceFlush(); } else { diff --git a/cpp_src/estl/contexted_cond_var.h b/cpp_src/estl/contexted_cond_var.h index 6affd486e..862d1c0db 100644 --- a/cpp_src/estl/contexted_cond_var.h +++ b/cpp_src/estl/contexted_cond_var.h @@ -30,19 +30,19 @@ class contexted_cond_var { } } - template - void wait(_Lock& __lock, const _ContextT& __context) { - using namespace std::string_view_literals; - assert(_M_cond_var); - // const auto lockWard = _M_context->BeforeLock(_Mutex::mark); - if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) { - while (_M_cond_var->wait_for(__lock, _M_chk_timeout) == std::cv_status::timeout) { - AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv); - } - } else { - _M_cond_var->wait(__lock); - } - } + // template + // void wait(_Lock& __lock, const _ContextT& __context) { + // using namespace std::string_view_literals; + // assert(_M_cond_var); + // // const auto lockWard = _M_context->BeforeLock(_Mutex::mark); + // if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) { + // while (_M_cond_var->wait_for(__lock, _M_chk_timeout) == std::cv_status::timeout) { + // AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv); + // } + // } else { + // _M_cond_var->wait(__lock); + // } + // } void notify_all() { assert(_M_cond_var);