Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
MadSchemas committed Feb 19, 2024
1 parent c74391b commit ce59edc
Show file tree
Hide file tree
Showing 52 changed files with 96 additions and 25,743 deletions.
4 changes: 2 additions & 2 deletions cpp_src/cluster/replication/updatesqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ class UpdatesQueue {
logTraceW([&] { rtfmt("Push new sync updates (%d) for %s", localData.dataSize, data[0].GetNsName()); });

std::cout << fmt::sprintf("[cluster:queue] Duplicated: Pushing new sync updates (%d) for %s. Last ID: %d\n", localData.dataSize,
nsName, queue_.size() ? queue_.back()->ID() : 0);
nsName, queue_.size() ? (queue_.back()->ID() + queue_.back()->Count()) : -1);

entriesRange = addDataToQueue(std::move(data), &onResult, dropped);

std::cout << fmt::sprintf("[cluster:queue] Duplicated: Added new sync updates (%d) for %s. Last ID: %d\n", localData.dataSize,
nsName, queue_.size() ? queue_.back()->ID() : 0);
nsName, queue_.size() ? (queue_.back()->ID() + queue_.back()->Count()) : -1);

if (beforeWait) {
beforeWait(); // FIXME: Think about better workaround
Expand Down
12 changes: 10 additions & 2 deletions cpp_src/core/clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "core/reindexer_impl/reindexerimpl.h"
#include "tools/clusterproxyloghelper.h"

#include "vendor/spdlog/fmt/fmt.h"

namespace reindexer {

#define CallProxyFunction(Fn) proxyCall<decltype(&ReindexerImpl::Fn), &ReindexerImpl::Fn, Error>
Expand Down Expand Up @@ -127,16 +129,22 @@ class ClusterProxy {
return proxyCall<LocalQueryActionFT, &ReindexerImpl::Update, Error>(ctx, q.NsName(), action, q, qr);
}
Error Upsert(std::string_view nsName, Item &item, const RdxContext &ctx) {
std::cout << fmt::sprintf("ClusterProxy::Upsert(...) into '%s' begin\n", nsName);
auto action = [this](const RdxContext &ctx, LeaderRefT clientToLeader, std::string_view nsName, Item &item) {
return itemFollowerAction<&client::Reindexer::Upsert>(ctx, clientToLeader, nsName, item);
};
return proxyCall<LocalItemSimpleActionFT, &ReindexerImpl::Upsert, Error>(ctx, nsName, action, nsName, item);
auto res = proxyCall<LocalItemSimpleActionFT, &ReindexerImpl::Upsert, Error>(ctx, nsName, action, nsName, item);
std::cout << fmt::sprintf("ClusterProxy::Upsert(...) into '%s' done\n", nsName);
return res;
}
Error Upsert(std::string_view nsName, Item &item, LocalQueryResults &qr, const RdxContext &ctx) {
std::cout << fmt::sprintf("ClusterProxy::Upsert(qr) into '%s' begin\n", nsName);
auto action = [this](const RdxContext &ctx, LeaderRefT clientToLeader, std::string_view nsName, Item &item, LocalQueryResults &qr) {
return resultItemFollowerAction<&client::Reindexer::Upsert>(ctx, clientToLeader, nsName, item, qr);
};
return proxyCall<LocalItemQrActionFT, &ReindexerImpl::Upsert, Error>(ctx, nsName, action, nsName, item, qr);
auto res = proxyCall<LocalItemQrActionFT, &ReindexerImpl::Upsert, Error>(ctx, nsName, action, nsName, item, qr);
std::cout << fmt::sprintf("ClusterProxy::Upsert(qr) into '%s' done\n", nsName);
return res;
}
Error Delete(std::string_view nsName, Item &item, const RdxContext &ctx) {
auto action = [this](const RdxContext &ctx, LeaderRefT clientToLeader, std::string_view nsName, Item &item) {
Expand Down
25 changes: 24 additions & 1 deletion cpp_src/core/namespace/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,20 @@ class Namespace {

CounterGuardAIR32 cg(ns->cancelCommitCnt_);
if constexpr (std::is_same_v<T, Item>) {
auto name = GetName(ctx);
if (enumVal == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(qr) into '%s' begin\n", name);
}
auto wlck = ns->dataWLock(nsCtx.rdxContext);
cg.Reset();
qr.AddNamespace(ns, true);
added = true;
(*ns.*fn)(v, enumVal, pendedRepl, nsCtx);
qr.AddItem(v, true, false);
ns->replicate(std::move(pendedRepl), std::move(wlck), true, nullptr, nsCtx);
if (enumVal == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(qr) into '%s' done\n", name);
}
} else {
auto params = longUpdDelLoggingParams_.load(std::memory_order_relaxed);
const bool isEnabled = params.thresholdUs >= 0 && !isSystemNamespaceNameFast(v.NsName());
Expand Down Expand Up @@ -127,9 +134,25 @@ class Namespace {
void Update(const Query &query, LocalQueryResults &result, const RdxContext &ctx) {
nsFuncWrapper<&NamespaceImpl::doUpdate, QueryType::QueryUpdate>(query, result, ctx);
}
void Upsert(Item &item, const RdxContext &ctx) { nsFuncWrapper<&NamespaceImpl::Upsert>(item, ctx); }
void Upsert(Item &item, const RdxContext &ctx) {
auto name = GetName(ctx);
if (!isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("Namespace::Upsert(...) into '%s' begin\n", name);
}
nsFuncWrapper<&NamespaceImpl::Upsert>(item, ctx);
if (!isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("Namespace::Upsert(...) into '%s' done\n", name);
}
}
void Upsert(Item &item, LocalQueryResults &qr, const RdxContext &ctx) {
auto name = GetName(ctx);
if (!isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("Namespace::Upsert(qr) into '%s' begin\n", name);
}
nsFuncWrapper<&NamespaceImpl::modifyItem, ItemModifyMode::ModeUpsert>(item, qr, ctx);
if (!isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("Namespace::Upsert(qr) into '%s' done\n", name);
}
}
void Delete(Item &item, const RdxContext &ctx) { nsFuncWrapper<&NamespaceImpl::Delete>(item, ctx); }
void Delete(Item &item, LocalQueryResults &qr, const RdxContext &ctx) {
Expand Down
13 changes: 13 additions & 0 deletions cpp_src/core/namespace/namespaceimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,10 @@ void NamespaceImpl::doTruncate(UpdatesContainer& pendedRepl, const NsContext& ct
}

void NamespaceImpl::ModifyItem(Item& item, ItemModifyMode mode, const RdxContext& ctx) {
auto name = GetName(ctx);
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name);
}
PerfStatCalculatorMT calc(updatePerfCounter_, enablePerfCounters_);
UpdatesContainer pendedRepl;

Expand All @@ -1491,6 +1495,9 @@ void NamespaceImpl::ModifyItem(Item& item, ItemModifyMode mode, const RdxContext
modifyItem(item, mode, pendedRepl, NsContext(ctx));

replicate(std::move(pendedRepl), std::move(wlck), true, nullptr, ctx);
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name)) {
std::cout << fmt::sprintf("NamespaceImpl::ModifyItem(...) into '%s' begin\n", name);
}
}

void NamespaceImpl::Truncate(const RdxContext& ctx) {
Expand Down Expand Up @@ -1809,7 +1816,13 @@ void NamespaceImpl::modifyItem(Item& item, ItemModifyMode mode, UpdatesContainer
if (mode == ModeDelete) {
deleteItem(item, pendedRepl, ctx);
} else {
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name_)) {
std::cout << fmt::sprintf("NamespaceImpl::modifyItem() into '%s' begin\n", name_);
}
doModifyItem(item, mode, pendedRepl, ctx);
if (mode == ModeUpsert && !isSystemNamespaceNameFast(name_)) {
std::cout << fmt::sprintf("NamespaceImpl::modifyItem() into '%s' done\n", name_);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion cpp_src/core/namespace/namespaceimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,9 @@ class NamespaceImpl : public intrusive_atomic_rc_base { // NOLINT(*performance.
QueryStatsCalculatorT &&statCalculator, const NsContext &ctx) {
if (!repl_.temporary) {
assertrx(!ctx.isCopiedNsRequest);
std::cout << fmt::sprintf("Namespace::'%s' replicating %d records\n", name_, recs.size());
if (!isSystem()) {
std::cout << fmt::sprintf("Namespace::'%s' replicating %d records\n", name_, recs.size());
}
auto err = clusterizator_.Replicate(
std::move(recs),
[&wlck]() {
Expand Down
20 changes: 20 additions & 0 deletions cpp_src/core/rdxcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ void ThrowOnCancel(const Context& ctx, std::string_view errMsg = std::string_vie
throw Error(errCanceled, errMsg.empty() ? kDefaultCancelError : errMsg);
}

template <typename Context>
void AssertOnCancel(const Context& ctx, std::string_view errMsg = std::string_view()) {
(void)errMsg;
if (!ctx.IsCancelable()) return;

const auto cancel = ctx.CheckCancel();
switch (cancel) {
case CancelType::Explicit:
assertrx(false);
std::abort();
case CancelType::Timeout:
assertrx(false);
std::abort();
case CancelType::None:
return;
}
assertrx(false);
std::abort();
}

class RdxDeadlineContext : public IRdxCancelContext {
public:
using ClockT = std::chrono::steady_clock;
Expand Down
19 changes: 17 additions & 2 deletions cpp_src/core/reindexer_impl/reindexerimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,13 @@ Error ReindexerImpl::applyNsFunction(std::string_view nsName, const RdxContext&
return applyNsFunction<needUpdateSys, void(decltype(arg1), decltype(arg2), const RdxContext&), &Namespace::memFn, decltype(arg1), \
decltype(arg2)>(nsName, ctx, arg1, arg2)

#define APPLY_NS_FUNCTION11(needUpdateSys, memFn, arg) \
applyNsFunction<needUpdateSys, void(decltype(arg), const RdxContext&), &Namespace::memFn, decltype(arg)>(nsName, ctx, arg)

#define APPLY_NS_FUNCTION22(needUpdateSys, memFn, arg1, arg2) \
applyNsFunction<needUpdateSys, void(decltype(arg1), decltype(arg2), const RdxContext&), &Namespace::memFn, decltype(arg1), \
decltype(arg2)>(nsName, ctx, arg1, arg2)

Error ReindexerImpl::Insert(std::string_view nsName, Item& item, const RdxContext& ctx) { APPLY_NS_FUNCTION1(true, Insert, item); }

Error ReindexerImpl::insertDontUpdateSystemNS(std::string_view nsName, Item& item, const RdxContext& ctx) {
Expand Down Expand Up @@ -965,10 +972,18 @@ Error ReindexerImpl::Update(const Query& q, LocalQueryResults& result, const Rdx
return errOK;
}

Error ReindexerImpl::Upsert(std::string_view nsName, Item& item, const RdxContext& ctx) { APPLY_NS_FUNCTION1(true, Upsert, item); }
Error ReindexerImpl::Upsert(std::string_view nsName, Item& item, const RdxContext& ctx) {
std::cout << fmt::sprintf("ReindexerImpl::Upsert(...) into '%s' begin\n", nsName);
auto res = APPLY_NS_FUNCTION11(true, Upsert, item);
std::cout << fmt::sprintf("ReindexerImpl::Upsert(...) into '%s' done\n", nsName);
return res;
}

Error ReindexerImpl::Upsert(std::string_view nsName, Item& item, LocalQueryResults& qr, const RdxContext& ctx) {
APPLY_NS_FUNCTION2(true, Upsert, item, qr);
std::cout << fmt::sprintf("ReindexerImpl::Upsert(qr) into '%s' begin\n", nsName);
auto res = APPLY_NS_FUNCTION22(true, Upsert, item, qr);
std::cout << fmt::sprintf("ReindexerImpl::Upsert(qr) into '%s' done\n", nsName);
return res;
}

Item ReindexerImpl::NewItem(std::string_view nsName, const RdxContext& rdxCtx) {
Expand Down
4 changes: 2 additions & 2 deletions cpp_src/estl/contexted_cond_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class contexted_cond_var {
// const auto lockWard = _M_context->BeforeLock(_Mutex::mark);
if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) {
while (!_M_cond_var->wait_for(__lock, _M_chk_timeout, __p)) {
ThrowOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
}
} else {
_M_cond_var->wait(__lock, std::move(__p));
Expand All @@ -37,7 +37,7 @@ class contexted_cond_var {
// const auto lockWard = _M_context->BeforeLock(_Mutex::mark);
if (_M_chk_timeout.count() > 0 && __context.IsCancelable()) {
while (_M_cond_var->wait_for(__lock, _M_chk_timeout) == std::cv_status::timeout) {
ThrowOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
AssertOnCancel(__context, "Context was canceled or timed out (condition variable)"sv);
}
} else {
_M_cond_var->wait(__lock);
Expand Down
Loading

0 comments on commit ce59edc

Please sign in to comment.