Skip to content

Commit

Permalink
test (logs)
Browse files Browse the repository at this point in the history
  • Loading branch information
MadSchemas committed Feb 19, 2024
1 parent 7bd3b34 commit c74391b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 7 deletions.
4 changes: 4 additions & 0 deletions cpp_src/cluster/clusterizator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ Error Clusterizator::Replicate(UpdatesContainer&& recs, std::function<void()> be

std::pair<Error, bool> res;
if (ctx.GetOriginLSN().isEmpty()) {
auto recsCnt = recs.size();
std::cout << fmt::sprintf("Clusterizator::'%s' pushing %d records into queue\n", recs[0].GetNsName(), recsCnt);
res = updatesQueue_.Push(std::move(recs), std::move(beforeWaitF), ctx);
std::cout << fmt::sprintf("Clusterizator::'%s' replicated %d records. Result: %s\n", recs[0].GetNsName(), recsCnt,
res.first.ok() ? "OK" : res.first.what());
} else {
// Update can't be replicated to cluster from another node, so may only be replicated to async replicas
res = updatesQueue_.PushAsync(std::move(recs));
Expand Down
2 changes: 2 additions & 0 deletions cpp_src/cluster/replication/replicationthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,12 +728,14 @@ UpdateApplyStatus ReplThread<BehaviourParamT>::nodeUpdatesHandlingLoop(Node& nod
auto& it = upd.Data();
if (it.IsNetworkCheckRecord()) {
[[maybe_unused]] bool v;
logInfo("%d:%d Handling network check record with id %d", serverId_, node.uid, node.nextUpdateId);
std::tie(v, res) = handleNetworkCheckRecord(node, updatePtr, offset, true, it);
if (!res.err.ok()) {
break;
}
continue;
}
logInfo("%d:%d Handling %d record with id %d", serverId_, node.uid, int(it.type), node.nextUpdateId);
const std::string& nsName = it.GetNsName();
if constexpr (!isClusterReplThread()) {
if (!bhvParam_.IsNamespaceInConfig(node.uid, nsName)) {
Expand Down
10 changes: 10 additions & 0 deletions cpp_src/cluster/replication/updatesqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "tools/errors.h"
#include "tools/stringstools.h"

#include <iostream>
#include "vendor/spdlog/fmt/fmt.h"

namespace reindexer {
namespace cluster {

Expand Down Expand Up @@ -271,10 +274,17 @@ class UpdatesQueue {
return std::make_pair(invalidationErr_, false);
}
try {
auto nsName = data[0].GetNsName();
logTraceW([&] { rtfmt("Push new sync updates (%d) for %s", localData.dataSize, data[0].GetNsName()); });

std::cout << fmt::sprintf("[cluster:queue] Duplicated: Pushing new sync updates (%d) for %s. Last ID: %d\n", localData.dataSize,
nsName, queue_.size() ? queue_.back()->ID() : 0);

entriesRange = addDataToQueue(std::move(data), &onResult, dropped);

std::cout << fmt::sprintf("[cluster:queue] Duplicated: Added new sync updates (%d) for %s. Last ID: %d\n", localData.dataSize,
nsName, queue_.size() ? queue_.back()->ID() : 0);

if (beforeWait) {
beforeWait(); // FIXME: Think about better workaround
}
Expand Down
7 changes: 7 additions & 0 deletions cpp_src/core/namespace/namespaceimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
const bool isFollowerNS = owner_.repl_.clusterStatus.role == ClusterizationStatus::Role::SimpleReplica ||
owner_.repl_.clusterStatus.role == ClusterizationStatus::Role::ClusterReplica;
bool synchronized = isFollowerNS || !requireSync || clusterizator_.IsInitialSyncDone(owner_.name_);
bool awaitingSync = !synchronized;
while (!synchronized) {
// This is required in case of rename during sync wait
auto name = owner_.name_;
Expand All @@ -272,12 +273,17 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
lck.lock();
checkInvalidation();
synchronized = clusterizator_.IsInitialSyncDone(owner_.name_);
std::cout << fmt::sprintf("'%s' is %s syncronized!\n", owner_.name_, synchronized ? "" : "not");
}

if (!skipClusterStatusCheck) {
owner_.checkClusterStatus(ctx); // throw exception if false
}

if (awaitingSync) {
std::cout << fmt::sprintf("'%s' got lock after sync\n", owner_.name_);
}

return lck;
}
WLockT SimpleWLock(const RdxContext &ctx) const {
Expand Down Expand Up @@ -581,6 +587,7 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
QueryStatsCalculatorT &&statCalculator, const NsContext &ctx) {
if (!repl_.temporary) {
assertrx(!ctx.isCopiedNsRequest);
std::cout << fmt::sprintf("Namespace::'%s' replicating %d records\n", name_, recs.size());
auto err = clusterizator_.Replicate(
std::move(recs),
[&wlck]() {
Expand Down
8 changes: 1 addition & 7 deletions cpp_src/tools/hardware_concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,9 @@

#include <thread>

#include <iostream>
#include "vendor/spdlog/fmt/fmt.h"

namespace reindexer {

// Wrapper to handle situation, when std::thread::hardware_concurrency returns 0.
inline unsigned hardware_concurrency() noexcept {
std::cout << fmt::sprintf("std::hardware_concurrency: %d\n", std::thread::hardware_concurrency());
return std::max(std::thread::hardware_concurrency(), 1u);
}
inline unsigned hardware_concurrency() noexcept { return std::max(std::thread::hardware_concurrency(), 1u); }

} // namespace reindexer

0 comments on commit c74391b

Please sign in to comment.