Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
MadSchemas committed Feb 19, 2024
1 parent d629021 commit 7bd3b34
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 35 deletions.
5 changes: 4 additions & 1 deletion cpp_src/cluster/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
namespace reindexer {
namespace cluster {

void Logger::print(LogLevel , std::string& str) const { str.append("\n"); std::cout << str; } ///logPrint(l, &str[0]); }
void Logger::print(LogLevel, std::string& str) const {
str.append("\n");
std::cout << str;
} // logPrint(l, &str[0]); }

} // namespace cluster
} // namespace reindexer
24 changes: 6 additions & 18 deletions cpp_src/cluster/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
#include <string>
#include "core/type_consts.h"

#include <iostream>
#include "tools/assertrx.h"

namespace reindexer {
namespace cluster {

Expand Down Expand Up @@ -46,22 +43,13 @@ class Logger {
}
template <typename F>
void Log(LogLevel l, F&& f) const {
if (l <= GetLevel()) {
try {
std::string str = f();
if (!str.empty()) {
const auto outLevel = minOutputLogLevel_ < l ? minOutputLogLevel_ : l;
print(outLevel, str);
}
} catch (std::exception& e) {
std::cout << "!!!!!" << e.what() << std::endl;
assertrx(false);
}
catch (...) {
std::cout << "!!!!!<unknown error>" << std::endl;
assertrx(false);
}
// if (l <= GetLevel()) {
std::string str = f();
if (!str.empty()) {
const auto outLevel = minOutputLogLevel_ < l ? minOutputLogLevel_ : l;
print(outLevel, str);
}
//}
}

private:
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/cluster/replication/replicationthread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,8 @@ UpdateApplyStatus ReplThread<BehaviourParamT>::nodeUpdatesHandlingLoop(Node& nod
statsCollector_.OnUpdateApplied(node.uid, updatePtr->ID());
return UpdateApplyStatus(Error(), UpdateRecord::Type::ResyncOnUpdatesDrop);
}
logTrace("%d:%d Got new update. Next update id: %d", serverId_, node.uid, node.nextUpdateId);
logTrace("%d:%d Got new update. Next update id: %d. Queue block id: %d, block count: %d", serverId_, node.uid,
node.nextUpdateId, updatePtr->ID(), updatePtr->Count());
node.nextUpdateId = updatePtr->ID() > node.nextUpdateId ? updatePtr->ID() : node.nextUpdateId;
for (uint16_t offset = node.nextUpdateId - updatePtr->ID(); offset < updatePtr->Count(); ++offset) {
if (updatePtr->IsInvalidated()) {
Expand Down
23 changes: 23 additions & 0 deletions cpp_src/cluster/replication/sharedsyncstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "estl/shared_mutex.h"
#include "tools/stringstools.h"

#include "vendor/spdlog/fmt/fmt.h"

namespace reindexer {
namespace cluster {

Expand All @@ -25,16 +27,28 @@ class SharedSyncState {
auto res = synchronized_.emplace(std::move(name));
lck.unlock();
if (res.second) {
std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", name);
cond_.notify_all();
} else {
std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", name);
}
} else {
std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", name,
RaftInfo::RoleToStr(current_.role));
}
}
void MarkSynchronized() {
std::unique_lock<MtxT> lck(mtx_);
if (current_.role == RaftInfo::Role::Leader) {
++initialSyncDoneCnt_;

std::cout << fmt::sprintf("Marking 'whole DB' as synchronized (with notification); initialSyncDoneCnt_ = %d\n",
initialSyncDoneCnt_);
lck.unlock();
cond_.notify_all();
} else {
std::cout << fmt::sprintf("Attempt to mark 'whole DB' as synchronized, but current role is %s\n",
RaftInfo::RoleToStr(current_.role));
}
}
void Reset(ContainerT requireSynchronization, size_t ReplThreadsCnt, bool enabled) {
Expand All @@ -47,6 +61,7 @@ class SharedSyncState {
ReplThreadsCnt_ = ReplThreadsCnt;
next_ = current_ = RaftInfo();
assert(ReplThreadsCnt_);
std::cout << fmt::sprintf("Reseting sync state\n");
}
template <typename ContextT>
void AwaitInitialSync(std::string_view name, const ContextT& ctx) const {
Expand All @@ -60,7 +75,9 @@ class SharedSyncState {
if (next_.role == RaftInfo::Role::Follower) {
throw Error(errWrongReplicationData, "Node role was changed to follower");
}
std::cout << fmt::sprintf("Initial sync is not done for '%s', awaiting...\n", name);
cond_.wait(lck, ctx);
std::cout << fmt::sprintf("Initial sync is done for '%s'!\n", name);
}
}
template <typename ContextT>
Expand All @@ -73,7 +90,9 @@ class SharedSyncState {
if (next_.role == RaftInfo::Role::Follower) {
throw Error(errWrongReplicationData, "Node role was changed to follower");
}
std::cout << fmt::sprintf("Initial sync is not done for 'whole DB', awaiting...\n");
cond_.wait(lck, ctx);
std::cout << fmt::sprintf("Initial sync is done for 'whole DB'!\n");
}
}
bool IsInitialSyncDone(std::string_view name) const {
Expand All @@ -90,10 +109,12 @@ class SharedSyncState {
std::unique_lock<MtxT> lck(mtx_);
if (expected == next_) {
if (current_.role == RaftInfo::Role::Leader && current_.role != next_.role) {
std::cout << fmt::sprintf("Clearing synchronized list on role switch\n");
synchronized_.clear();
initialSyncDoneCnt_ = 0;
}
current_ = next_;
std::cout << fmt::sprintf("Role transition done, sending notification!\n");
lck.unlock();
cond_.notify_all();
return expected;
Expand All @@ -107,13 +128,15 @@ class SharedSyncState {
cond_.wait(
lck, [this] { return !isRunning() || next_ == current_; }, ctx);
} else {
std::cout << fmt::sprintf("Awaiting role transition... Current role is %s\n", RaftInfo::RoleToStr(current_.role));
cond_.wait(
lck,
[this] {
return !isRunning() ||
(next_ == current_ && (current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower));
},
ctx);
std::cout << fmt::sprintf("Role transition done! Current role is %s\n", RaftInfo::RoleToStr(current_.role));
}
return current_;
}
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/core/ft/ft_fast/dataprocessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "core/ft/numtotext.h"
#include "core/ft/typos.h"

#include "tools/hardware_concurrency.h"
#include "tools/logger.h"
#include "tools/serializer.h"
#include "tools/stringstools.h"
Expand Down Expand Up @@ -131,7 +132,7 @@ std::vector<WordIdType> DataProcessor<IdCont>::BuildSuffix(words_map &words_um,

template <typename IdCont>
size_t DataProcessor<IdCont>::buildWordsMap(words_map &words_um) {
uint32_t maxIndexWorkers = multithread_ ? std::thread::hardware_concurrency() : 1;
uint32_t maxIndexWorkers = multithread_ ? hardware_concurrency() : 1;
if (!maxIndexWorkers) {
maxIndexWorkers = 1;
} else if (maxIndexWorkers > 8) {
Expand Down
9 changes: 4 additions & 5 deletions cpp_src/core/namespace/namespaceimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "tools/errors.h"
#include "tools/flagguard.h"
#include "tools/fsops.h"
#include "tools/hardware_concurrency.h"
#include "tools/logger.h"
#include "tools/timetools.h"
#include "wal/walselecter.h"
Expand Down Expand Up @@ -181,7 +182,7 @@ NamespaceImpl::~NamespaceImpl() {
static constexpr double kDeleteNs = 0.25;

const double k = dbDestroyed_.load(std::memory_order_relaxed) ? kDeleteRxDestroy : kDeleteNs;
threadsCount = k * std::thread::hardware_concurrency();
threadsCount = k * hardware_concurrency();
if (threadsCount > indexes_.size() + 1) {
threadsCount = indexes_.size() + 1;
}
Expand Down Expand Up @@ -1982,7 +1983,7 @@ void NamespaceImpl::checkUniquePK(const ConstPayload& cpl, bool inTransaction, c
}

void NamespaceImpl::optimizeIndexes(const NsContext& ctx) {
static const auto kHardwareConcurrency = std::thread::hardware_concurrency();
static const auto kHardwareConcurrency = hardware_concurrency();
// This is read lock only atomics based implementation of rebuild indexes
// If optimizationState_ == OptimizationCompleted is true, then indexes are completely built.
// In this case reset optimizationState_ and/or any idset's and sort orders builds are allowed only protected by write lock
Expand Down Expand Up @@ -2022,9 +2023,7 @@ void NamespaceImpl::optimizeIndexes(const NsContext& ctx) {

// Update sort orders and sort_id for each index
size_t currentSortId = 1;
const size_t maxIndexWorkers = kHardwareConcurrency
? std::min<size_t>(std::thread::hardware_concurrency(), config_.optimizationSortWorkers)
: config_.optimizationSortWorkers;
const size_t maxIndexWorkers = std::min<size_t>(kHardwareConcurrency, config_.optimizationSortWorkers);
for (auto& idxIt : indexes_) {
if (idxIt->IsOrdered() && maxIndexWorkers != 0) {
NSUpdateSortedContext sortCtx(*this, currentSortId++);
Expand Down
3 changes: 2 additions & 1 deletion cpp_src/core/reindexer_impl/reindexerimpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "tools/catch_and_return.h"
#include "tools/errors.h"
#include "tools/fsops.h"
#include "tools/hardware_concurrency.h"
#include "tools/logger.h"

#include "debug/backtrace.h"
Expand All @@ -51,7 +52,7 @@ constexpr char kActionConfigType[] = "action";
constexpr unsigned kStorageLoadingThreads = 6;

static unsigned ConcurrentNamespaceLoaders() noexcept {
const auto hwConc = std::thread::hardware_concurrency();
const auto hwConc = hardware_concurrency();
if (hwConc <= 4) {
return 1;
} else if (hwConc < 8) { // '<' is not a typo
Expand Down
76 changes: 76 additions & 0 deletions cpp_src/events/event_subscriber_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "event_subscriber_config.h"
#include "core/cjson/jsonbuilder.h"

namespace reindexer {

using namespace std::string_view_literals;

Error EventSubscriberConfig::FromJSON(span<char> json) {
try {
FromJSON(gason::JsonParser().Parse(json));
} catch (const gason::Exception &ex) {
return Error(errParseJson, "UpdatesFilter: %s", ex.what());
} catch (const Error &err) {
return err;
}
return {};
}

void EventSubscriberConfig::FromJSON(const gason::JsonNode &root) {
formatVersion_ = root["version"sv].As<int>(-1);
if (formatVersion_ < kMinSubscribersConfigFormatVersion) {
throw Error(errParams, "Min supported subscribers config format version is %d, but %d version was found in JSON",
kMinSubscribersConfigFormatVersion, formatVersion_)
}
streams_.clear();
streams_.resize(kMaxStreamsPerSub);

withDBName_ = root["with_db_name"sv].As<bool>(false);
for (const auto &stream : root["streams"sv]) {
const int id = stream["id"].As<int>(-1);
if (id < 0 || id >= streams_.size()) {
throw Error(errParams, "Stream ID %d is out of range", id);
}
if (streams_[id].has_value()) {
throw Error(errParams, "Stream ID %d is duplicated", id);
}

auto &s = streams_[id].emplace();
s.withConfigNamespace = stream["with_config_namespace"sv].As<bool>(false);
for (const auto &ns : stream["namespaces"sv]) {
auto name = ns["name"sv].As<std::string_view>();
for (const auto &f : ns["filters"sv]) {
UpdatesFilters::Filter filter;
filter.FromJSON(f);
s.filters.AddFilter(name, std::move(filter));
}
}
}
}

void EventSubscriberConfig::GetJSON(WrSerializer &ser) const {
JsonBuilder builder(ser);
{
builder.Put("version"sv, formatVersion_);
builder.Put("with_config_namespace"sv, withDBName_);
auto streamArr = builder.Array("streams"sv);
for (size_t i = 0; i < streams_.size(); ++i) {
auto streamObj = streamArr.Object();
if (streams_[i].has_value()) {
streamObj.Put("with_config_namespace"sv, streams_[i]->withConfigNamespace);
auto nsArr = streamObj.Array("namespaces"sv);
for (const auto &nsFilters : streams_[i]->filters) {
auto obj = nsArr.Object();
obj.Put("name"sv, nsFilters.first);
auto arrFilters = obj.Array("filters"sv);
for (const auto &filter : nsFilters.second) {
auto filtersObj = arrFilters.Object();
filter.GetJSON(filtersObj);
}
}
}
}
}
}

} // namespace reindexer
79 changes: 79 additions & 0 deletions cpp_src/events/event_subscriber_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#pragma once

#include <optional>
#include "core/type_consts.h"
#include "estl/fast_hash_map.h"
#include "estl/span.h"
#include "tools/errors.h"
#include "tools/stringstools.h"

namespace gason {
struct JsonNode;
}

namespace reindexer {

class JsonBuilder;

/// Object of this class contains filters set. Filters are separated by namespace and concatenated with disjunction
class UpdatesFilters {
public:
class Filter {
public:
// TODO: Any additional condition check should be added here
bool Check() const { return true; }
void FromJSON(const gason::JsonNode &) {}
void GetJSON(JsonBuilder &) const {}

bool operator==(const Filter &) const { return true; }
};

/// Merge two filters sets
/// If one of the filters set is empty, result filters set will also be empty
/// If one of the filters set contains some conditions for specific namespace,
/// then result filters set will also contain this conditions
/// @param rhs - Another filters set
void Merge(const UpdatesFilters &rhs);
/// Add new filter for specified namespace. Doesn't merge filters, just concatenates it into disjunction sequence
/// @param ns - Namespace
/// @param filter - Filter to add
void AddFilter(std::string_view ns, Filter filter);
/// Check if filters set allows this namespace
/// @param ns - Namespace
/// @return 'true' if filter's conditions are satisfied
bool Check(std::string_view ns) const;

Error FromJSON(span<char> json);
void FromJSON(const gason::JsonNode &root);
void GetJSON(WrSerializer &ser) const;

bool operator==(const UpdatesFilters &rhs) const;

private:
using FiltersList = h_vector<Filter, 4>;

fast_hash_map<std::string, FiltersList, nocase_hash_str, nocase_equal_str, nocase_less_str> filters_;
};

class EventSubscriberConfig {
public:
struct StreamConfig {
UpdatesFilters filters;
bool withConfigNamespace = false;
};
using StreamsContainerT = std::vector<std::optional<StreamConfig>>;

Error FromJSON(span<char> json);
void FromJSON(const gason::JsonNode &root);
void GetJSON(WrSerializer &ser) const;

const StreamsContainerT &Streams() const noexcept { return streams_; }
bool WithDBName() const noexcept { return withDBName_; }

private:
int formatVersion_ = kSubscribersConfigFormatVersion;
bool withDBName_ = false;
StreamsContainerT streams_;
};

} // namespace reindexer
Loading

0 comments on commit 7bd3b34

Please sign in to comment.