From e8781eb5555b06b1c92d901f347e2186ce026402 Mon Sep 17 00:00:00 2001 From: Jose Date: Tue, 17 Dec 2024 12:38:46 +0100 Subject: [PATCH 1/3] DataStorm minor fixes --- cpp/include/DataStorm/DataStorm.h | 6 +++--- cpp/include/DataStorm/InternalT.h | 26 ++++++++++++-------------- cpp/src/DataStorm/DataElementI.cpp | 4 ++-- cpp/src/DataStorm/DataElementI.h | 2 +- cpp/src/DataStorm/SessionI.cpp | 23 +++++++++++++++++------ 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/cpp/include/DataStorm/DataStorm.h b/cpp/include/DataStorm/DataStorm.h index 4debd8f8e80..57f01a598c2 100644 --- a/cpp/include/DataStorm/DataStorm.h +++ b/cpp/include/DataStorm/DataStorm.h @@ -1842,7 +1842,7 @@ namespace DataStorm return [](const std::string& criteria) { std::regex expr(criteria); - return [expr](const Value& value) + return [expr = std::move(expr)](const Value& value) { std::ostringstream os; os << value; @@ -1888,8 +1888,8 @@ namespace DataStorm _topicFactory(node._factory), _keyFactory(DataStormI::KeyFactoryT::createFactory()), _tagFactory(DataStormI::TagFactoryT::createFactory()), - _keyFilterFactories(DataStormI::FilterManagerT>::create()), - _sampleFilterFactories(DataStormI::FilterManagerT>::create()) + _keyFilterFactories(std::make_shared>>()), + _sampleFilterFactories(std::make_shared>>()) { RegexFilter::add(_keyFilterFactories); RegexFilter, Value>::add(_sampleFilterFactories); diff --git a/cpp/include/DataStorm/InternalT.h b/cpp/include/DataStorm/InternalT.h index 12f7427e79f..f4495ffb767 100644 --- a/cpp/include/DataStorm/InternalT.h +++ b/cpp/include/DataStorm/InternalT.h @@ -136,12 +136,13 @@ namespace DataStormI template class AbstractFactoryT : public std::enable_shared_from_this> { + /// A custom deleter to remove the element from the factory when the shared_ptr is deleted. + /// The deleter is used by elements created by the factory. struct Deleter { void operator()(V* obj) { - auto factory = _factory.lock(); - if (factory) + if (auto factory = _factory.lock()) { factory->remove(obj); } @@ -155,7 +156,7 @@ namespace DataStormI public: AbstractFactoryT() : _nextId(1) {} - void init() { _deleter = {std::enable_shared_from_this>::shared_from_this()}; } + void init() { _deleter = Deleter{._factory = std::enable_shared_from_this>::shared_from_this()}; } template std::shared_ptr create(F&& value, Args&&... args) @@ -168,7 +169,7 @@ namespace DataStormI { std::lock_guard lock(_mutex); std::vector> seq; - for (auto& v : values) + for (auto v : values) { seq.push_back(createImpl(std::move(v))); } @@ -184,17 +185,15 @@ namespace DataStormI auto p = _elementsById.find(id); if (p != _elementsById.end()) { - auto k = p->second.lock(); - if (k) - { - return k; - } + return p->second.lock(); } return nullptr; } template std::shared_ptr createImpl(F&& value, Args&&... args) { + // Called with _mutex locked + auto p = _elements.find(value); if (p != _elements.end()) { @@ -515,20 +514,19 @@ namespace DataStormI }; public: - FilterManagerT() {} template std::shared_ptr create(const std::string& name, const Criteria& criteria) { auto p = _factories.find(name); if (p == _factories.end()) { - throw std::invalid_argument("unknown filter `" + name + "'"); + throw std::invalid_argument("unknown filter '" + name + "'"); } auto factory = dynamic_cast*>(p->second.get()); if (!factory) { - throw std::invalid_argument("filter `" + name + "' type doesn't match"); + throw std::invalid_argument("filter '" + name + "' type doesn't match"); } return factory->create(criteria); @@ -571,9 +569,9 @@ namespace DataStormI } } - static std::shared_ptr> create() { return std::make_shared>(); } - private: + + // A map containing the filter factories, indexed by the filter name. std::map> _factories; }; } diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index d94e01a7f55..92e53627fb0 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -1360,12 +1360,12 @@ FilteredDataReaderI::FilteredDataReaderI( TopicReaderI* topic, string name, int64_t id, - const shared_ptr& filter, + shared_ptr filter, string sampleFilterName, Ice::ByteSeq sampleFilterCriteria, const DataStorm::ReaderConfig& config) : DataReaderI(topic, std::move(name), id, std::move(sampleFilterName), std::move(sampleFilterCriteria), config), - _filter(filter) + _filter(std::move(filter)) { if (_traceLevels->data > 0) { diff --git a/cpp/src/DataStorm/DataElementI.h b/cpp/src/DataStorm/DataElementI.h index 39309080636..19289403551 100644 --- a/cpp/src/DataStorm/DataElementI.h +++ b/cpp/src/DataStorm/DataElementI.h @@ -469,7 +469,7 @@ namespace DataStormI TopicReaderI*, std::string, std::int64_t, - const std::shared_ptr&, + std::shared_ptr, std::string, Ice::ByteSeq, const DataStorm::ReaderConfig&); diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 642843cdbc2..e6ca9ee1c0c 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -106,7 +106,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&) }); } - // Reap un-visited topics + // Reap dead topics corresponding to subscriptions from a previous session instance ID. Subscribers from the + // previous session instance ID that were not reattached to the new session instance ID are removed. auto p = _topics.begin(); while (p != _topics.end()) { @@ -604,10 +605,19 @@ bool SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) { lock_guard lock(_mutex); - if (_destroyed || (connection && _connection != connection) || !_session) + if (_destroyed) { - // Ignore since we are either already destroyed, or disconnected, or a new connection has already been - // established. + // Ignore already destroyed. + return false; + } + else if (connection && _connection != connection) + { + // Ignore the session has already reconnected using a new connection. + return false; + } + else if (!_session) + { + // Ignore if the session is already disconnected. return false; } @@ -633,8 +643,9 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) } } + // Detach all topics from the session. auto self = shared_from_this(); - for (auto& t : _topics) + for (const auto& t : _topics) { runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); }); } @@ -886,7 +897,7 @@ SessionI::unsubscribe(int64_t id, TopicI* topic) { for (auto& [element, elementSubscriber] : elementSubscribers.getSubscribers()) { - for (auto key : elementSubscriber.keys) + for (const auto& key : elementSubscriber.keys) { if (elementId > 0) { From 19d04d53ce94aa8c5ab07d4df1e358c968897bb3 Mon Sep 17 00:00:00 2001 From: Jose Date: Tue, 17 Dec 2024 12:47:46 +0100 Subject: [PATCH 2/3] clang-format fixes --- cpp/include/DataStorm/DataStorm.h | 3 ++- cpp/include/DataStorm/InternalT.h | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/include/DataStorm/DataStorm.h b/cpp/include/DataStorm/DataStorm.h index 57f01a598c2..1e16cba679c 100644 --- a/cpp/include/DataStorm/DataStorm.h +++ b/cpp/include/DataStorm/DataStorm.h @@ -1889,7 +1889,8 @@ namespace DataStorm _keyFactory(DataStormI::KeyFactoryT::createFactory()), _tagFactory(DataStormI::TagFactoryT::createFactory()), _keyFilterFactories(std::make_shared>>()), - _sampleFilterFactories(std::make_shared>>()) + _sampleFilterFactories( + std::make_shared>>()) { RegexFilter::add(_keyFilterFactories); RegexFilter, Value>::add(_sampleFilterFactories); diff --git a/cpp/include/DataStorm/InternalT.h b/cpp/include/DataStorm/InternalT.h index f4495ffb767..6556b333ff4 100644 --- a/cpp/include/DataStorm/InternalT.h +++ b/cpp/include/DataStorm/InternalT.h @@ -156,7 +156,10 @@ namespace DataStormI public: AbstractFactoryT() : _nextId(1) {} - void init() { _deleter = Deleter{._factory = std::enable_shared_from_this>::shared_from_this()}; } + void init() + { + _deleter = Deleter{._factory = std::enable_shared_from_this>::shared_from_this()}; + } template std::shared_ptr create(F&& value, Args&&... args) @@ -514,7 +517,6 @@ namespace DataStormI }; public: - template std::shared_ptr create(const std::string& name, const Criteria& criteria) { auto p = _factories.find(name); @@ -570,7 +572,6 @@ namespace DataStormI } private: - // A map containing the filter factories, indexed by the filter name. std::map> _factories; }; From 31b34c103748d626ab4c99d9d013cfe148a1b824 Mon Sep 17 00:00:00 2001 From: Jose Date: Tue, 17 Dec 2024 16:11:02 +0100 Subject: [PATCH 3/3] Update cpp/include/DataStorm/InternalT.h --- cpp/include/DataStorm/InternalT.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/include/DataStorm/InternalT.h b/cpp/include/DataStorm/InternalT.h index 6556b333ff4..603d5d9b00b 100644 --- a/cpp/include/DataStorm/InternalT.h +++ b/cpp/include/DataStorm/InternalT.h @@ -172,7 +172,7 @@ namespace DataStormI { std::lock_guard lock(_mutex); std::vector> seq; - for (auto v : values) + for (auto& v : values) { seq.push_back(createImpl(std::move(v))); }