From 7bd3b342b727330fe576abec999e7e886dc8f8a0 Mon Sep 17 00:00:00 2001 From: madschemas <155993105+MadSchemas@users.noreply.github.com> Date: Mon, 19 Feb 2024 12:18:01 +0300 Subject: [PATCH] test --- cpp_src/cluster/logger.cc | 5 +- cpp_src/cluster/logger.h | 24 ++---- .../cluster/replication/replicationthread.cc | 3 +- cpp_src/cluster/replication/sharedsyncstate.h | 23 ++++++ cpp_src/core/ft/ft_fast/dataprocessor.cc | 3 +- cpp_src/core/namespace/namespaceimpl.cc | 9 +-- cpp_src/core/reindexer_impl/reindexerimpl.cc | 3 +- cpp_src/events/event_subscriber_config.cc | 76 ++++++++++++++++++ cpp_src/events/event_subscriber_config.h | 79 +++++++++++++++++++ cpp_src/gtests/tests/unit/rpcclient_test.cc | 3 +- cpp_src/net/listener.cc | 4 +- cpp_src/tools/hardware_concurrency.h | 16 ++++ cpp_src/vendor/sparse-map/sparse_hash.h | 13 +-- 13 files changed, 226 insertions(+), 35 deletions(-) create mode 100644 cpp_src/events/event_subscriber_config.cc create mode 100644 cpp_src/events/event_subscriber_config.h create mode 100644 cpp_src/tools/hardware_concurrency.h diff --git a/cpp_src/cluster/logger.cc b/cpp_src/cluster/logger.cc index bdc94b8e4..7dc3bc328 100644 --- a/cpp_src/cluster/logger.cc +++ b/cpp_src/cluster/logger.cc @@ -6,7 +6,10 @@ namespace reindexer { namespace cluster { -void Logger::print(LogLevel , std::string& str) const { str.append("\n"); std::cout << str; } ///logPrint(l, &str[0]); } +void Logger::print(LogLevel, std::string& str) const { + str.append("\n"); + std::cout << str; +} // logPrint(l, &str[0]); } } // namespace cluster } // namespace reindexer diff --git a/cpp_src/cluster/logger.h b/cpp_src/cluster/logger.h index f398fee22..4ea2d8b91 100644 --- a/cpp_src/cluster/logger.h +++ b/cpp_src/cluster/logger.h @@ -4,9 +4,6 @@ #include #include "core/type_consts.h" -#include -#include "tools/assertrx.h" - namespace reindexer { namespace cluster { @@ -46,22 +43,13 @@ class Logger { } template void Log(LogLevel l, F&& f) const { - if (l <= GetLevel()) { - try { - std::string str = f(); - if (!str.empty()) { - const auto outLevel = minOutputLogLevel_ < l ? minOutputLogLevel_ : l; - print(outLevel, str); - } - } catch (std::exception& e) { - std::cout << "!!!!!" << e.what() << std::endl; - assertrx(false); - } - catch (...) { - std::cout << "!!!!!" << std::endl; - assertrx(false); - } + // if (l <= GetLevel()) { + std::string str = f(); + if (!str.empty()) { + const auto outLevel = minOutputLogLevel_ < l ? minOutputLogLevel_ : l; + print(outLevel, str); } + //} } private: diff --git a/cpp_src/cluster/replication/replicationthread.cc b/cpp_src/cluster/replication/replicationthread.cc index e7a6d79f4..5c17fa9ec 100644 --- a/cpp_src/cluster/replication/replicationthread.cc +++ b/cpp_src/cluster/replication/replicationthread.cc @@ -715,7 +715,8 @@ UpdateApplyStatus ReplThread::nodeUpdatesHandlingLoop(Node& nod statsCollector_.OnUpdateApplied(node.uid, updatePtr->ID()); return UpdateApplyStatus(Error(), UpdateRecord::Type::ResyncOnUpdatesDrop); } - logTrace("%d:%d Got new update. Next update id: %d", serverId_, node.uid, node.nextUpdateId); + logTrace("%d:%d Got new update. Next update id: %d. Queue block id: %d, block count: %d", serverId_, node.uid, + node.nextUpdateId, updatePtr->ID(), updatePtr->Count()); node.nextUpdateId = updatePtr->ID() > node.nextUpdateId ? updatePtr->ID() : node.nextUpdateId; for (uint16_t offset = node.nextUpdateId - updatePtr->ID(); offset < updatePtr->Count(); ++offset) { if (updatePtr->IsInvalidated()) { diff --git a/cpp_src/cluster/replication/sharedsyncstate.h b/cpp_src/cluster/replication/sharedsyncstate.h index 5df53eb2e..4c3b13a1b 100644 --- a/cpp_src/cluster/replication/sharedsyncstate.h +++ b/cpp_src/cluster/replication/sharedsyncstate.h @@ -8,6 +8,8 @@ #include "estl/shared_mutex.h" #include "tools/stringstools.h" +#include "vendor/spdlog/fmt/fmt.h" + namespace reindexer { namespace cluster { @@ -25,16 +27,28 @@ class SharedSyncState { auto res = synchronized_.emplace(std::move(name)); lck.unlock(); if (res.second) { + std::cout << fmt::sprintf("Marking '%s' as synchronized (with notification)\n", name); cond_.notify_all(); + } else { + std::cout << fmt::sprintf("Marking '%s' as synchronized (no notification)\n", name); } + } else { + std::cout << fmt::sprintf("Attempt to mark '%s' as synchronized, but current role is %s\n", name, + RaftInfo::RoleToStr(current_.role)); } } void MarkSynchronized() { std::unique_lock lck(mtx_); if (current_.role == RaftInfo::Role::Leader) { ++initialSyncDoneCnt_; + + std::cout << fmt::sprintf("Marking 'whole DB' as synchronized (with notification); initialSyncDoneCnt_ = %d\n", + initialSyncDoneCnt_); lck.unlock(); cond_.notify_all(); + } else { + std::cout << fmt::sprintf("Attempt to mark 'whole DB' as synchronized, but current role is %s\n", + RaftInfo::RoleToStr(current_.role)); } } void Reset(ContainerT requireSynchronization, size_t ReplThreadsCnt, bool enabled) { @@ -47,6 +61,7 @@ class SharedSyncState { ReplThreadsCnt_ = ReplThreadsCnt; next_ = current_ = RaftInfo(); assert(ReplThreadsCnt_); + std::cout << fmt::sprintf("Reseting sync state\n"); } template void AwaitInitialSync(std::string_view name, const ContextT& ctx) const { @@ -60,7 +75,9 @@ class SharedSyncState { if (next_.role == RaftInfo::Role::Follower) { throw Error(errWrongReplicationData, "Node role was changed to follower"); } + std::cout << fmt::sprintf("Initial sync is not done for '%s', awaiting...\n", name); cond_.wait(lck, ctx); + std::cout << fmt::sprintf("Initial sync is done for '%s'!\n", name); } } template @@ -73,7 +90,9 @@ class SharedSyncState { if (next_.role == RaftInfo::Role::Follower) { throw Error(errWrongReplicationData, "Node role was changed to follower"); } + std::cout << fmt::sprintf("Initial sync is not done for 'whole DB', awaiting...\n"); cond_.wait(lck, ctx); + std::cout << fmt::sprintf("Initial sync is done for 'whole DB'!\n"); } } bool IsInitialSyncDone(std::string_view name) const { @@ -90,10 +109,12 @@ class SharedSyncState { std::unique_lock lck(mtx_); if (expected == next_) { if (current_.role == RaftInfo::Role::Leader && current_.role != next_.role) { + std::cout << fmt::sprintf("Clearing synchronized list on role switch\n"); synchronized_.clear(); initialSyncDoneCnt_ = 0; } current_ = next_; + std::cout << fmt::sprintf("Role transition done, sending notification!\n"); lck.unlock(); cond_.notify_all(); return expected; @@ -107,6 +128,7 @@ class SharedSyncState { cond_.wait( lck, [this] { return !isRunning() || next_ == current_; }, ctx); } else { + std::cout << fmt::sprintf("Awaiting role transition... Current role is %s\n", RaftInfo::RoleToStr(current_.role)); cond_.wait( lck, [this] { @@ -114,6 +136,7 @@ class SharedSyncState { (next_ == current_ && (current_.role == RaftInfo::Role::Leader || current_.role == RaftInfo::Role::Follower)); }, ctx); + std::cout << fmt::sprintf("Role transition done! Current role is %s\n", RaftInfo::RoleToStr(current_.role)); } return current_; } diff --git a/cpp_src/core/ft/ft_fast/dataprocessor.cc b/cpp_src/core/ft/ft_fast/dataprocessor.cc index 1b3df8eb0..4621c9de2 100644 --- a/cpp_src/core/ft/ft_fast/dataprocessor.cc +++ b/cpp_src/core/ft/ft_fast/dataprocessor.cc @@ -6,6 +6,7 @@ #include "core/ft/numtotext.h" #include "core/ft/typos.h" +#include "tools/hardware_concurrency.h" #include "tools/logger.h" #include "tools/serializer.h" #include "tools/stringstools.h" @@ -131,7 +132,7 @@ std::vector DataProcessor::BuildSuffix(words_map &words_um, template size_t DataProcessor::buildWordsMap(words_map &words_um) { - uint32_t maxIndexWorkers = multithread_ ? std::thread::hardware_concurrency() : 1; + uint32_t maxIndexWorkers = multithread_ ? hardware_concurrency() : 1; if (!maxIndexWorkers) { maxIndexWorkers = 1; } else if (maxIndexWorkers > 8) { diff --git a/cpp_src/core/namespace/namespaceimpl.cc b/cpp_src/core/namespace/namespaceimpl.cc index f75de2b81..5f817925c 100644 --- a/cpp_src/core/namespace/namespaceimpl.cc +++ b/cpp_src/core/namespace/namespaceimpl.cc @@ -24,6 +24,7 @@ #include "tools/errors.h" #include "tools/flagguard.h" #include "tools/fsops.h" +#include "tools/hardware_concurrency.h" #include "tools/logger.h" #include "tools/timetools.h" #include "wal/walselecter.h" @@ -181,7 +182,7 @@ NamespaceImpl::~NamespaceImpl() { static constexpr double kDeleteNs = 0.25; const double k = dbDestroyed_.load(std::memory_order_relaxed) ? kDeleteRxDestroy : kDeleteNs; - threadsCount = k * std::thread::hardware_concurrency(); + threadsCount = k * hardware_concurrency(); if (threadsCount > indexes_.size() + 1) { threadsCount = indexes_.size() + 1; } @@ -1982,7 +1983,7 @@ void NamespaceImpl::checkUniquePK(const ConstPayload& cpl, bool inTransaction, c } void NamespaceImpl::optimizeIndexes(const NsContext& ctx) { - static const auto kHardwareConcurrency = std::thread::hardware_concurrency(); + static const auto kHardwareConcurrency = hardware_concurrency(); // This is read lock only atomics based implementation of rebuild indexes // If optimizationState_ == OptimizationCompleted is true, then indexes are completely built. // In this case reset optimizationState_ and/or any idset's and sort orders builds are allowed only protected by write lock @@ -2022,9 +2023,7 @@ void NamespaceImpl::optimizeIndexes(const NsContext& ctx) { // Update sort orders and sort_id for each index size_t currentSortId = 1; - const size_t maxIndexWorkers = kHardwareConcurrency - ? std::min(std::thread::hardware_concurrency(), config_.optimizationSortWorkers) - : config_.optimizationSortWorkers; + const size_t maxIndexWorkers = std::min(kHardwareConcurrency, config_.optimizationSortWorkers); for (auto& idxIt : indexes_) { if (idxIt->IsOrdered() && maxIndexWorkers != 0) { NSUpdateSortedContext sortCtx(*this, currentSortId++); diff --git a/cpp_src/core/reindexer_impl/reindexerimpl.cc b/cpp_src/core/reindexer_impl/reindexerimpl.cc index 4a9266e75..bd52b68ab 100644 --- a/cpp_src/core/reindexer_impl/reindexerimpl.cc +++ b/cpp_src/core/reindexer_impl/reindexerimpl.cc @@ -25,6 +25,7 @@ #include "tools/catch_and_return.h" #include "tools/errors.h" #include "tools/fsops.h" +#include "tools/hardware_concurrency.h" #include "tools/logger.h" #include "debug/backtrace.h" @@ -51,7 +52,7 @@ constexpr char kActionConfigType[] = "action"; constexpr unsigned kStorageLoadingThreads = 6; static unsigned ConcurrentNamespaceLoaders() noexcept { - const auto hwConc = std::thread::hardware_concurrency(); + const auto hwConc = hardware_concurrency(); if (hwConc <= 4) { return 1; } else if (hwConc < 8) { // '<' is not a typo diff --git a/cpp_src/events/event_subscriber_config.cc b/cpp_src/events/event_subscriber_config.cc new file mode 100644 index 000000000..4b4ce0519 --- /dev/null +++ b/cpp_src/events/event_subscriber_config.cc @@ -0,0 +1,76 @@ +#include "event_subscriber_config.h" +#include "core/cjson/jsonbuilder.h" + +namespace reindexer { + +using namespace std::string_view_literals; + +Error EventSubscriberConfig::FromJSON(span json) { + try { + FromJSON(gason::JsonParser().Parse(json)); + } catch (const gason::Exception &ex) { + return Error(errParseJson, "UpdatesFilter: %s", ex.what()); + } catch (const Error &err) { + return err; + } + return {}; +} + +void EventSubscriberConfig::FromJSON(const gason::JsonNode &root) { + formatVersion_ = root["version"sv].As(-1); + if (formatVersion_ < kMinSubscribersConfigFormatVersion) { + throw Error(errParams, "Min supported subscribers config format version is %d, but %d version was found in JSON", + kMinSubscribersConfigFormatVersion, formatVersion_) + } + streams_.clear(); + streams_.resize(kMaxStreamsPerSub); + + withDBName_ = root["with_db_name"sv].As(false); + for (const auto &stream : root["streams"sv]) { + const int id = stream["id"].As(-1); + if (id < 0 || id >= streams_.size()) { + throw Error(errParams, "Stream ID %d is out of range", id); + } + if (streams_[id].has_value()) { + throw Error(errParams, "Stream ID %d is duplicated", id); + } + + auto &s = streams_[id].emplace(); + s.withConfigNamespace = stream["with_config_namespace"sv].As(false); + for (const auto &ns : stream["namespaces"sv]) { + auto name = ns["name"sv].As(); + for (const auto &f : ns["filters"sv]) { + UpdatesFilters::Filter filter; + filter.FromJSON(f); + s.filters.AddFilter(name, std::move(filter)); + } + } + } +} + +void EventSubscriberConfig::GetJSON(WrSerializer &ser) const { + JsonBuilder builder(ser); + { + builder.Put("version"sv, formatVersion_); + builder.Put("with_config_namespace"sv, withDBName_); + auto streamArr = builder.Array("streams"sv); + for (size_t i = 0; i < streams_.size(); ++i) { + auto streamObj = streamArr.Object(); + if (streams_[i].has_value()) { + streamObj.Put("with_config_namespace"sv, streams_[i]->withConfigNamespace); + auto nsArr = streamObj.Array("namespaces"sv); + for (const auto &nsFilters : streams_[i]->filters) { + auto obj = nsArr.Object(); + obj.Put("name"sv, nsFilters.first); + auto arrFilters = obj.Array("filters"sv); + for (const auto &filter : nsFilters.second) { + auto filtersObj = arrFilters.Object(); + filter.GetJSON(filtersObj); + } + } + } + } + } +} + +} // namespace reindexer diff --git a/cpp_src/events/event_subscriber_config.h b/cpp_src/events/event_subscriber_config.h new file mode 100644 index 000000000..9adbb13f1 --- /dev/null +++ b/cpp_src/events/event_subscriber_config.h @@ -0,0 +1,79 @@ +#pragma once + +#include +#include "core/type_consts.h" +#include "estl/fast_hash_map.h" +#include "estl/span.h" +#include "tools/errors.h" +#include "tools/stringstools.h" + +namespace gason { +struct JsonNode; +} + +namespace reindexer { + +class JsonBuilder; + +/// Object of this class contains filters set. Filters are separated by namespace and concatenated with disjunction +class UpdatesFilters { +public: + class Filter { + public: + // TODO: Any additional condition check should be added here + bool Check() const { return true; } + void FromJSON(const gason::JsonNode &) {} + void GetJSON(JsonBuilder &) const {} + + bool operator==(const Filter &) const { return true; } + }; + + /// Merge two filters sets + /// If one of the filters set is empty, result filters set will also be empty + /// If one of the filters set contains some conditions for specific namespace, + /// then result filters set will also contain this conditions + /// @param rhs - Another filters set + void Merge(const UpdatesFilters &rhs); + /// Add new filter for specified namespace. Doesn't merge filters, just concatenates it into disjunction sequence + /// @param ns - Namespace + /// @param filter - Filter to add + void AddFilter(std::string_view ns, Filter filter); + /// Check if filters set allows this namespace + /// @param ns - Namespace + /// @return 'true' if filter's conditions are satisfied + bool Check(std::string_view ns) const; + + Error FromJSON(span json); + void FromJSON(const gason::JsonNode &root); + void GetJSON(WrSerializer &ser) const; + + bool operator==(const UpdatesFilters &rhs) const; + +private: + using FiltersList = h_vector; + + fast_hash_map filters_; +}; + +class EventSubscriberConfig { +public: + struct StreamConfig { + UpdatesFilters filters; + bool withConfigNamespace = false; + }; + using StreamsContainerT = std::vector>; + + Error FromJSON(span json); + void FromJSON(const gason::JsonNode &root); + void GetJSON(WrSerializer &ser) const; + + const StreamsContainerT &Streams() const noexcept { return streams_; } + bool WithDBName() const noexcept { return withDBName_; } + +private: + int formatVersion_ = kSubscribersConfigFormatVersion; + bool withDBName_ = false; + StreamsContainerT streams_; +}; + +} // namespace reindexer diff --git a/cpp_src/gtests/tests/unit/rpcclient_test.cc b/cpp_src/gtests/tests/unit/rpcclient_test.cc index 6918b524e..a53f11310 100644 --- a/cpp_src/gtests/tests/unit/rpcclient_test.cc +++ b/cpp_src/gtests/tests/unit/rpcclient_test.cc @@ -13,6 +13,7 @@ #include "gtests/tests/gtest_cout.h" #include "net/ev/ev.h" #include "reindexertestapi.h" +#include "tools/hardware_concurrency.h" using std::chrono::seconds; @@ -43,7 +44,7 @@ TEST_F(RPCClientTestApi, CoroRequestTimeout) { } static std::chrono::seconds GetMaxTimeForCoroSelectTimeout(unsigned requests, std::chrono::seconds delay) { - const auto cpus = std::thread::hardware_concurrency(); + const auto cpus = reindexer::hardware_concurrency(); const auto kBase = std::max(requests * delay.count() / 16, delay.count()); const std::chrono::seconds kDefaultMaxTime(kBase + 10); if (cpus == 0) { diff --git a/cpp_src/net/listener.cc b/cpp_src/net/listener.cc index 1f7790aa0..a1eec767d 100644 --- a/cpp_src/net/listener.cc +++ b/cpp_src/net/listener.cc @@ -7,6 +7,7 @@ #include "net/http/serverconnection.h" #include "server/pprof/gperf_profiler.h" #include "tools/alloc_ext/tc_malloc_extension.h" +#include "tools/hardware_concurrency.h" #include "tools/logger.h" namespace reindexer { @@ -33,8 +34,7 @@ Listener::Listener(ev::dynamic_loop &loop, std::shared_ptr shared) template Listener::Listener(ev::dynamic_loop &loop, ConnectionFactory &&connFactory, int maxListeners) - : Listener(loop, - std::make_shared(std::move(connFactory), (maxListeners ? maxListeners : std::thread::hardware_concurrency()) + 1)) {} + : Listener(loop, std::make_shared(std::move(connFactory), (maxListeners ? maxListeners : hardware_concurrency()) + 1)) {} template Listener::~Listener() { diff --git a/cpp_src/tools/hardware_concurrency.h b/cpp_src/tools/hardware_concurrency.h new file mode 100644 index 000000000..ceaa885b2 --- /dev/null +++ b/cpp_src/tools/hardware_concurrency.h @@ -0,0 +1,16 @@ +#pragma once + +#include + +#include +#include "vendor/spdlog/fmt/fmt.h" + +namespace reindexer { + +// Wrapper to handle situation, when std::thread::hardware_concurrency returns 0. +inline unsigned hardware_concurrency() noexcept { + std::cout << fmt::sprintf("std::hardware_concurrency: %d\n", std::thread::hardware_concurrency()); + return std::max(std::thread::hardware_concurrency(), 1u); +} + +} // namespace reindexer diff --git a/cpp_src/vendor/sparse-map/sparse_hash.h b/cpp_src/vendor/sparse-map/sparse_hash.h index 5115de301..a645a9bf9 100644 --- a/cpp_src/vendor/sparse-map/sparse_hash.h +++ b/cpp_src/vendor/sparse-map/sparse_hash.h @@ -42,6 +42,8 @@ #include "sparse_growth_policy.h" +#include "tools/hardware_concurrency.h" + #ifdef __INTEL_COMPILER #include // For _popcnt32 and _popcnt64 #endif @@ -1084,10 +1086,11 @@ class sparse_hash : private Allocator, private Hash, private KeyEqual, private G m_sparse_buckets = m_sparse_buckets_data.empty() ? static_empty_sparse_bucket_ptr() : m_sparse_buckets_data.data(); } - sparse_hash(sparse_hash &&other) noexcept( - std::is_nothrow_move_constructible::value &&std::is_nothrow_move_constructible::value - &&std::is_nothrow_move_constructible::value &&std::is_nothrow_move_constructible::value - &&std::is_nothrow_move_constructible::value) + sparse_hash(sparse_hash &&other) noexcept(std::is_nothrow_move_constructible::value && + std::is_nothrow_move_constructible::value && + std::is_nothrow_move_constructible::value && + std::is_nothrow_move_constructible::value && + std::is_nothrow_move_constructible::value) : Allocator(std::move(other)), Hash(std::move(other)), KeyEqual(std::move(other)), @@ -1242,7 +1245,7 @@ class sparse_hash : private Allocator, private Hash, private KeyEqual, private G } }); } else { - unsigned int partCount = std::thread::hardware_concurrency(); + unsigned int partCount = reindexer::hardware_concurrency(); for (unsigned int i = 0; i < partCount; i++) { size_t from = m_sparse_buckets_data.size() / partCount * i; size_t to = i + 1 == partCount ? m_sparse_buckets_data.size() : m_sparse_buckets_data.size() / partCount * (i + 1);