From f9b8b96fd82ec7e0ab9012fd8e4c11a93aa0c6fb Mon Sep 17 00:00:00 2001 From: Jose Date: Tue, 19 Nov 2024 16:58:53 +0100 Subject: [PATCH] DataStorm fixes for configuration, session retries, and more (#3166) --- config/PropertyNames.xml | 5 + cpp/src/DataStorm/NodeI.cpp | 3 + cpp/src/DataStorm/SessionI.cpp | 76 +++++-------- cpp/src/DataStorm/SessionI.h | 1 - cpp/src/DataStorm/TopicI.cpp | 200 +++++++++++++++------------------ cpp/src/DataStorm/TopicI.h | 17 +-- cpp/src/Ice/PropertyNames.cpp | 7 +- 7 files changed, 138 insertions(+), 171 deletions(-) diff --git a/config/PropertyNames.xml b/config/PropertyNames.xml index a0dd7e84b73..7836f952aad 100644 --- a/config/PropertyNames.xml +++ b/config/PropertyNames.xml @@ -409,6 +409,11 @@ + + + + + diff --git a/cpp/src/DataStorm/NodeI.cpp b/cpp/src/DataStorm/NodeI.cpp index 345ede60d5b..da8cf37d9fd 100644 --- a/cpp/src/DataStorm/NodeI.cpp +++ b/cpp/src/DataStorm/NodeI.cpp @@ -212,6 +212,8 @@ NodeI::createSession( nullptr, [=](auto ex) { self->removePublisherSession(*subscriber, session, ex); }); assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection); + + // Session::connected informs the subscriber session of all the topic writers in the current node. session->connected( *subscriberSession, connection, @@ -257,6 +259,7 @@ NodeI::confirmCreateSession( publisherSession = publisherSession->ice_fixed(current.con); } + // Session::connected informs the publisher session of all the topic readers in the current node. session->connected(*publisherSession, current.con, getInstance()->getTopicFactory()->getTopicReaders()); } diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 7b28e4676aa..8a0bd5274b2 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -512,27 +512,23 @@ SessionI::disconnected(const Ice::Current& current) } void -SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, const TopicInfoSeq& topics) +SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& newConnection, const TopicInfoSeq& topics) { lock_guard lock(_mutex); if (_destroyed || _session) { - assert(_connectedCallbacks.empty()); return; } _session = session; - _connection = connection; - if (connection) + _connection = newConnection; + if (newConnection) { auto self = shared_from_this(); -#if defined(__GNUC__) -# pragma GCC diagnostic push -# pragma GCC diagnostic ignored "-Wshadow" -#endif + // Register a callback with the connection manager to reconnect the session if the connection is closed. _instance->getConnectionManager()->add( - connection, + newConnection, self, [self](auto connection, auto ex) { @@ -544,9 +540,6 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co } } }); -#if defined(__GNUC__) -# pragma GCC diagnostic pop -#endif } if (_retryTask) @@ -571,6 +564,7 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co { try { + // Announce the topics to the peer, don't wait for the result. _session->announceTopicsAsync(topics, true, nullptr); } catch (const Ice::LocalException&) @@ -578,12 +572,6 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co // Ignore } } - - for (auto c : _connectedCallbacks) - { - c(_proxy); - } - _connectedCallbacks.clear(); } bool @@ -592,6 +580,7 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) lock_guard lock(_mutex); if (_destroyed || (connection && _connection != connection) || !_session) { + // Ignore we either already destroyed, or disconnected, or a new connection has already been established. return false; } @@ -605,21 +594,22 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) } else { - throw Ice::CloseConnectionException(__FILE__, __LINE__); + throw Ice::CloseConnectionException{__FILE__, __LINE__}; } } catch (const std::exception& e) { Trace out(_traceLevels, _traceLevels->sessionCat); - out << _id << ": session `" << _session->ice_getIdentity() << "' disconnected:\n"; + out << _id << ": session '" << _session->ice_getIdentity() << "' disconnected:\n"; out << (_connection ? _connection->toString() : "") << "\n"; out << e.what(); } } + auto self = shared_from_this(); for (auto& t : _topics) { - runWithTopics(t.first, [&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); }); + runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); }); } _session = nullopt; @@ -635,9 +625,7 @@ SessionI::retry(NodePrx node, exception_ptr exception) if (exception) { - // // Don't retry if we are shutting down. - // try { rethrow_exception(exception); @@ -655,20 +643,17 @@ SessionI::retry(NodePrx node, exception_ptr exception) } } - if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty()) + // Cancel any pending retry task, before we start a new one below. + if (_retryTask) { - if (_retryTask) - { - _instance->cancelTimerTask(_retryTask); - _retryTask = nullptr; - } - _retryCount = 0; + _instance->cancelTimerTask(_retryTask); + _retryTask = nullptr; + } - // - // If we can't retry connecting to the node because we don't have its endpoints, - // we just wait for the duration of the last retry delay for the peer to reconnect. - // If it doesn't reconnect, we'll destroy this session after the timeout. - // + if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty()) + { + // We cannot retry because we don't have the peer endpoints. Wait twice the last retry interval for the peer to + // reconnect. auto delay = _instance->getRetryDelay(_instance->getRetryCount()) * 2; if (_traceLevels->session > 0) @@ -678,16 +663,15 @@ SessionI::retry(NodePrx node, exception_ptr exception) << " (ms) for peer to reconnect"; } + // Schedule a timer to remove the session if the peer doesn't reconnect. _retryTask = make_shared([self = shared_from_this()] { self->remove(); }); _instance->scheduleTimerTask(_retryTask, delay); } else { - // - // If we can retry the connection attempt, we schedule a timer to retry. Always - // retry immediately on the first attempt. - // + // Schedule a timer to retry. Always retry immediately on the first attempt. auto delay = _retryCount == 0 ? 0ms : _instance->getRetryDelay(_retryCount - 1); + // Increment the retry count, it is reset by disconnected. ++_retryCount; if (_traceLevels->session > 0) @@ -703,13 +687,14 @@ SessionI::retry(NodePrx node, exception_ptr exception) out << _id << ": connection to `" << node->ice_toString() << "` failed and the retry limit has been reached"; } + if (exception) { try { rethrow_exception(exception); } - catch (const Ice::LocalException& ex) + catch (const std::exception& ex) { out << '\n' << ex.what(); } @@ -770,21 +755,14 @@ SessionI::destroyImpl(const exception_ptr& ex) _session = nullopt; _connection = nullptr; + auto self = shared_from_this(); for (const auto& t : _topics) { - runWithTopics( - t.first, - [&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); }); + runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); }); } _topics.clear(); } - for (auto c : _connectedCallbacks) - { - c(nullopt); - } - _connectedCallbacks.clear(); - try { _instance->getObjectAdapter()->remove(_proxy->ice_getIdentity()); diff --git a/cpp/src/DataStorm/SessionI.h b/cpp/src/DataStorm/SessionI.h index 19b950435ff..8990db725a5 100644 --- a/cpp/src/DataStorm/SessionI.h +++ b/cpp/src/DataStorm/SessionI.h @@ -345,7 +345,6 @@ namespace DataStormI std::optional _session; Ice::ConnectionPtr _connection; - std::vector)>> _connectedCallbacks; }; class SubscriberSessionI : public SessionI, public DataStormContract::SubscriberSession diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index 4a96bb142ee..723e5e49488 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -13,24 +13,6 @@ using namespace DataStormContract; namespace { - template vector toSeq(const map& map) - { - vector seq; - seq.reserve(map.size()); - for (const auto& p : map) - { - seq.push_back(p.second); - } - return seq; - } - - int toInt(const string& v, int value = 0) - { - istringstream is(v); - is >> value; - return value; - } - static Topic::Updater noOpUpdater = [](const shared_ptr& previous, const shared_ptr& next, const Ice::CommunicatorPtr&) { next->setValue(previous); }; @@ -57,6 +39,54 @@ namespace bool match(const shared_ptr&) const final { return true; } }; const auto alwaysMatchFilter = make_shared(); + + DataStorm::ClearHistoryPolicy parseClearHistory(const std::string& value) + { + if (value == "OnAdd") + { + return DataStorm::ClearHistoryPolicy::OnAdd; + } + else if (value == "OnRemove") + { + return DataStorm::ClearHistoryPolicy::OnRemove; + } + else if (value == "OnAll") + { + return DataStorm::ClearHistoryPolicy::OnAll; + } + else if (value == "OnAllExceptPartialUpdate") + { + return DataStorm::ClearHistoryPolicy::OnAllExceptPartialUpdate; + } + else if (value == "Never") + { + return DataStorm::ClearHistoryPolicy::Never; + } + else + { + throw Ice::ParseException(__FILE__, __LINE__, "Invalid clear history policy: " + value); + } + } + + DataStorm::DiscardPolicy parseDiscardPolicy(const std::string& value) + { + if (value == "Never") + { + return DataStorm::DiscardPolicy::None; + } + else if (value == "SendTime") + { + return DataStorm::DiscardPolicy::SendTime; + } + else if (value == "Priority") + { + return DataStorm::DiscardPolicy::Priority; + } + else + { + throw Ice::ParseException(__FILE__, __LINE__, "Invalid discard policy: " + value); + } + } } TopicI::TopicI( @@ -272,24 +302,24 @@ TopicI::getElementSpecs(int64_t topicId, const ElementInfoSeq& infos, const shar } void -TopicI::attach(int64_t id, const shared_ptr& session, SessionPrx prx) +TopicI::attach(int64_t id, shared_ptr session, SessionPrx peerSession) { - auto p = _listeners.find(ListenerKey{session}); + auto p = _listeners.find(session); if (p == _listeners.end()) { - p = _listeners.emplace(ListenerKey{session}, Listener(std::move(prx))).first; + p = _listeners.emplace(std::move(session), Listener(std::move(peerSession))).first; } if (p->second.topics.insert(id).second) { - session->subscribe(id, this); + p->first->subscribe(id, this); } } void TopicI::detach(int64_t id, const shared_ptr& session) { - auto p = _listeners.find(ListenerKey{session}); + auto p = _listeners.find(session); if (p != _listeners.end() && p->second.topics.erase(id)) { session->unsubscribe(id, this); @@ -710,16 +740,17 @@ TopicI::notifyListenerWaiters(unique_lock& lock) const void TopicI::disconnect() { - map listeners; + map, Listener> listeners; { unique_lock lock(_mutex); listeners.swap(_listeners); } + for (auto s : listeners) { for (auto t : s.second.topics) { - s.first.session->disconnect(t, this); + s.first->disconnect(t, this); } } @@ -782,6 +813,7 @@ TopicI::add(const shared_ptr& element, const vectorgetId(), "", key->encode(_instance->getCommunicator())}); p->second.insert(element); } + if (!infos.empty()) { try @@ -817,46 +849,6 @@ TopicI::addFiltered(const shared_ptr& element, const shared_ptrsecond); - } - p = properties.find(prefix + ".SampleCount"); - if (p != properties.end()) - { - config.sampleCount = toInt(p->second); - } - p = properties.find(prefix + ".ClearHistory"); - if (p != properties.end()) - { - if (p->second == "OnAdd") - { - config.clearHistory = DataStorm::ClearHistoryPolicy::OnAdd; - } - else if (p->second == "OnRemove") - { - config.clearHistory = DataStorm::ClearHistoryPolicy::OnRemove; - } - else if (p->second == "OnAll") - { - config.clearHistory = DataStorm::ClearHistoryPolicy::OnAll; - } - else if (p->second == "OnAllExceptPartialUpdate") - { - config.clearHistory = DataStorm::ClearHistoryPolicy::OnAllExceptPartialUpdate; - } - else if (p->second == "Never") - { - config.clearHistory = DataStorm::ClearHistoryPolicy::Never; - } - } -} - TopicReaderI::TopicReaderI( shared_ptr factory, shared_ptr keyFactory, @@ -876,8 +868,7 @@ TopicReaderI::TopicReaderI( std::move(name), id) { - _defaultConfig = {-1, 0, DataStorm::ClearHistoryPolicy::OnAll, DataStorm::DiscardPolicy::None}; - _defaultConfig = mergeConfigs(parseConfig("DataStorm.Topic")); + _defaultConfig = parseConfig(); } shared_ptr @@ -954,47 +945,41 @@ TopicReaderI::destroy() } DataStorm::ReaderConfig -TopicReaderI::parseConfig(const string& prefix) const +TopicReaderI::parseConfig() const { + auto properties = _instance->getCommunicator()->getProperties(); DataStorm::ReaderConfig config; - auto properties = _instance->getCommunicator()->getProperties()->getPropertiesForPrefix(prefix); - parseConfigImpl(properties, prefix, config); - auto p = properties.find(prefix + ".DiscardPolicy"); - if (p != properties.end()) - { - if (p->second == "None") - { - config.discardPolicy = DataStorm::DiscardPolicy::None; - } - else if (p->second == "SendTime") - { - config.discardPolicy = DataStorm::DiscardPolicy::SendTime; - } - else if (p->second == "SendTime") - { - config.discardPolicy = DataStorm::DiscardPolicy::Priority; - } - } + config.clearHistory = parseClearHistory(properties->getIceProperty("DataStorm.Topic.ClearHistory")); + config.sampleCount = properties->getIcePropertyAsInt("DataStorm.Topic.SampleCount"); + config.sampleLifetime = properties->getIcePropertyAsInt("DataStorm.Topic.SampleLifetime"); + config.discardPolicy = parseDiscardPolicy(properties->getIceProperty("DataStorm.Topic.DiscardPolicy")); return config; } DataStorm::ReaderConfig TopicReaderI::mergeConfigs(DataStorm::ReaderConfig config) const { - if (!config.sampleCount && _defaultConfig.sampleCount) + if (!config.sampleCount.has_value()) { + assert(_defaultConfig.sampleCount.has_value()); config.sampleCount = _defaultConfig.sampleCount; } - if (!config.sampleLifetime && _defaultConfig.sampleLifetime) + + if (!config.sampleLifetime.has_value()) { + assert(_defaultConfig.sampleLifetime.has_value()); config.sampleLifetime = _defaultConfig.sampleLifetime; } - if (!config.clearHistory && _defaultConfig.clearHistory) + + if (!config.clearHistory.has_value()) { + assert(_defaultConfig.clearHistory.has_value()); config.clearHistory = _defaultConfig.clearHistory; } - if (!config.discardPolicy && _defaultConfig.discardPolicy) + + if (!config.discardPolicy.has_value()) { + assert(_defaultConfig.discardPolicy.has_value()); config.discardPolicy = _defaultConfig.discardPolicy; } return config; @@ -1019,8 +1004,7 @@ TopicWriterI::TopicWriterI( std::move(name), id) { - _defaultConfig = {-1, 0, DataStorm::ClearHistoryPolicy::OnAll}; - _defaultConfig = mergeConfigs(parseConfig("DataStorm.Topic")); + _defaultConfig = parseConfig(); } shared_ptr @@ -1064,39 +1048,41 @@ TopicWriterI::destroy() } DataStorm::WriterConfig -TopicWriterI::parseConfig(const string& prefix) const +TopicWriterI::parseConfig() const { + auto properties = _instance->getCommunicator()->getProperties(); DataStorm::WriterConfig config; - auto properties = _instance->getCommunicator()->getProperties()->getPropertiesForPrefix(prefix); - parseConfigImpl(properties, prefix, config); - auto p = properties.find(prefix + ".Priority"); - if (p != properties.end()) - { - istringstream is(p->second); - int priority; - is >> priority; - config.priority = priority; - } + config.clearHistory = parseClearHistory(properties->getIceProperty("DataStorm.Topic.ClearHistory")); + config.sampleCount = properties->getIcePropertyAsInt("DataStorm.Topic.SampleCount"); + config.sampleLifetime = properties->getIcePropertyAsInt("DataStorm.Topic.SampleLifetime"); + config.priority = properties->getIcePropertyAsInt("DataStorm.Topic.Priority"); return config; } DataStorm::WriterConfig TopicWriterI::mergeConfigs(DataStorm::WriterConfig config) const { - if (!config.sampleCount && _defaultConfig.sampleCount) + if (!config.sampleCount.has_value()) { + assert(_defaultConfig.sampleCount.has_value()); config.sampleCount = _defaultConfig.sampleCount; } - if (!config.sampleLifetime && _defaultConfig.sampleLifetime) + + if (!config.sampleLifetime.has_value()) { + assert(_defaultConfig.sampleLifetime.has_value()); config.sampleLifetime = _defaultConfig.sampleLifetime; } - if (!config.clearHistory && _defaultConfig.clearHistory) + + if (!config.clearHistory.has_value()) { + assert(_defaultConfig.clearHistory.has_value()); config.clearHistory = _defaultConfig.clearHistory; } - if (!config.priority && _defaultConfig.priority) + + if (!config.priority.has_value()) { + assert(_defaultConfig.priority.has_value()); config.priority = _defaultConfig.priority; } return config; diff --git a/cpp/src/DataStorm/TopicI.h b/cpp/src/DataStorm/TopicI.h index 7647d88928d..2dcc702b9eb 100644 --- a/cpp/src/DataStorm/TopicI.h +++ b/cpp/src/DataStorm/TopicI.h @@ -18,13 +18,6 @@ namespace DataStormI class TopicI : public virtual Topic, public std::enable_shared_from_this { - struct ListenerKey - { - std::shared_ptr session; - - bool operator<(const ListenerKey& other) const { return session < other.session; } - }; - struct Listener { Listener(DataStormContract::SessionPrx sessionPrx) : proxy(std::move(sessionPrx)) {} @@ -58,7 +51,7 @@ namespace DataStormI DataStormContract::ElementSpecSeq getElementSpecs(std::int64_t, const DataStormContract::ElementInfoSeq&, const std::shared_ptr&); - void attach(std::int64_t, const std::shared_ptr&, DataStormContract::SessionPrx); + void attach(std::int64_t, std::shared_ptr, DataStormContract::SessionPrx); void detach(std::int64_t, const std::shared_ptr&); DataStormContract::ElementSpecAckSeq attachElements( @@ -116,8 +109,6 @@ namespace DataStormI void add(const std::shared_ptr&, const std::vector>&); void addFiltered(const std::shared_ptr&, const std::shared_ptr&); - void parseConfigImpl(const Ice::PropertyDict&, const std::string&, DataStorm::Config&) const; - friend class DataElementI; friend class DataReaderI; friend class FilteredDataReaderI; @@ -142,7 +133,7 @@ namespace DataStormI bool _destroyed; std::map, std::set>> _keyElements; std::map, std::set>> _filteredElements; - std::map _listeners; + std::map, Listener> _listeners; std::map, Updater> _updaters; size_t _listenerCount; mutable size_t _waiters; @@ -182,7 +173,7 @@ namespace DataStormI void destroy() final; private: - DataStorm::ReaderConfig parseConfig(const std::string&) const; + DataStorm::ReaderConfig parseConfig() const; DataStorm::ReaderConfig mergeConfigs(DataStorm::ReaderConfig) const; DataStorm::ReaderConfig _defaultConfig; @@ -210,7 +201,7 @@ namespace DataStormI void destroy() final; private: - DataStorm::WriterConfig parseConfig(const std::string&) const; + DataStorm::WriterConfig parseConfig() const; DataStorm::WriterConfig mergeConfigs(DataStorm::WriterConfig) const; DataStorm::WriterConfig _defaultConfig; diff --git a/cpp/src/Ice/PropertyNames.cpp b/cpp/src/Ice/PropertyNames.cpp index 11c69f5f23b..af095ef620b 100644 --- a/cpp/src/Ice/PropertyNames.cpp +++ b/cpp/src/Ice/PropertyNames.cpp @@ -572,6 +572,11 @@ const Property DataStormPropsData[] = Property{"Node.Server", "", false, false, &PropertyNames::ObjectAdapterProps}, Property{"Node.Server.Enabled", "1", false, false, nullptr}, Property{"Node.Server.ForwardDiscoveryToMulticast", "0", false, false, nullptr}, + Property{"Topic.ClearHistory", "OnAll", false, false, nullptr}, + Property{"Topic.DiscardPolicy", "Never", false, false, nullptr}, + Property{"Topic.Priority", "0", false, false, nullptr}, + Property{"Topic.SampleCount", "-1", false, false, nullptr}, + Property{"Topic.SampleLifetime", "0", false, false, nullptr}, Property{"Trace.Data", "0", false, false, nullptr}, Property{"Trace.Session", "0", false, false, nullptr}, Property{"Trace.Topic", "0", false, false, nullptr} @@ -583,7 +588,7 @@ const PropertyArray PropertyNames::DataStormProps .prefixOnly=false, .isOptIn=true, .properties=DataStormPropsData, - .length=13 + .length=18 }; const std::array PropertyNames::validProps =