diff --git a/cpp/include/DataStorm/DataStorm.h b/cpp/include/DataStorm/DataStorm.h index 640f68493c5..1580eee74aa 100644 --- a/cpp/include/DataStorm/DataStorm.h +++ b/cpp/include/DataStorm/DataStorm.h @@ -1988,7 +1988,7 @@ namespace DataStorm const Ice::CommunicatorPtr& communicator) { Value value; - if(previous) + if (previous) { value = Cloner::clone( std::static_pointer_cast>(previous)->getValue()); diff --git a/cpp/include/DataStorm/InternalT.h b/cpp/include/DataStorm/InternalT.h index aefc4088b1f..dd76439ab64 100644 --- a/cpp/include/DataStorm/InternalT.h +++ b/cpp/include/DataStorm/InternalT.h @@ -177,7 +177,7 @@ namespace DataStormI protected: friend struct Deleter; - std::shared_ptr getImpl(long long id) const + std::shared_ptr getImpl(std::int64_t id) const { std::lock_guard lock(_mutex); auto p = _elementsById.find(id); diff --git a/cpp/src/DataStorm/CallbackExecutor.h b/cpp/src/DataStorm/CallbackExecutor.h index ec166185cc1..9385a19669d 100644 --- a/cpp/src/DataStorm/CallbackExecutor.h +++ b/cpp/src/DataStorm/CallbackExecutor.h @@ -32,7 +32,7 @@ namespace DataStormI bool _flush; bool _destroyed; std::vector> _queue; - // An optional executor or null if no o custom executor is provided during Node construction. + // An optional executor or null if no custom executor is provided during Node construction. std::function call)> _customExecutor; }; } diff --git a/cpp/src/DataStorm/Contract.ice b/cpp/src/DataStorm/Contract.ice index 0b2f98e62cc..90508858dfd 100644 --- a/cpp/src/DataStorm/Contract.ice +++ b/cpp/src/DataStorm/Contract.ice @@ -11,12 +11,19 @@ module DataStormContract { + /// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its + /// DataSample history in response to various events. enum ClearHistoryPolicy { + /// The reader clears its history when a new DataSample is added. OnAdd, + /// The reader clears its history when a DataSample is removed. OnRemove, + /// The reader clears its history when any DataSample event occurs. OnAll, + /// The reader clears its history when any DataSample event occurs, except for PartialUpdate events. OnAllExceptPartialUpdate, + /// The reader never clears its history. Never } @@ -95,13 +102,13 @@ module DataStormContract /// The topic update tags. ElementInfoSeq tags; - }; + } struct FilterInfo { string name; Ice::ByteSeq criteria; - }; + } class ElementConfig(1) { @@ -113,7 +120,7 @@ module DataStormContract optional(10) int sampleCount; optional(11) int sampleLifetime; optional(12) ClearHistoryPolicy clearHistory; - }; + } struct ElementData { @@ -193,7 +200,13 @@ module DataStormContract interface Session { + /// Called by sessions to announce topics to the peer. A publisher session announces the topics it writes, + /// while a subscriber session announces the topics it reads. + /// + /// @param topics The topics to announce. + /// @param initialize currently unused. void announceTopics(TopicInfoSeq topics, bool initialize); + void attachTopic(TopicSpec topic); void detachTopic(long topic); @@ -230,16 +243,16 @@ module DataStormContract /// publisher node through a SubscriberSession proxy. interface Node { - /// Initiate the creation of a publisher session with a node, after - /// the target node has announced a topic reader for which this node has a corresponding topic writer. + /// Initiate the creation of a publisher session with a node, after the target node has announced a topic + /// reader for which this node has a corresponding topic writer. /// /// @param publisher The publisher node initiating the session. The proxy is never null. /// @see Lookup::announceTopicReader void initiateCreateSession(Node* publisher); - /// Initiate the creation of a subscriber session with a node, after - /// the target node has announced a topic writer for which this node has a corresponding topic reader, - /// or after the node has called Node::initiateCreateSession. + /// Initiate the creation of a subscriber session with a node, after the target node has announced a topic + /// writer for which this node has a corresponding topic reader, or after the node has called + /// Node::initiateCreateSession. /// /// @param subscriber The subscriber node initiating the session. The proxy is never null. /// @param session The subscriber session being created. The proxy is never null. diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index b84200dab23..b8058b7e217 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -170,7 +170,7 @@ DataElementI::attach( (id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority))) { auto q = data.lastIds.find(_id); - long long lastId = q != data.lastIds.end() ? q->second : 0; + int64_t lastId = q != data.lastIds.end() ? q->second : 0; samples.push_back(getSamples(key, sampleFilter, data.config, lastId, now)); } @@ -210,12 +210,14 @@ DataElementI::attachKey( _executor->queue([self = shared_from_this(), name] { self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); }); } + if (addConnectedKey(key, subscriber)) { if (key) { subscriber->keys.insert(key); } + if (_traceLevels->data > 1) { Trace out(_traceLevels, _traceLevels->dataCat); diff --git a/cpp/src/DataStorm/NodeI.cpp b/cpp/src/DataStorm/NodeI.cpp index 326f310d132..345ede60d5b 100644 --- a/cpp/src/DataStorm/NodeI.cpp +++ b/cpp/src/DataStorm/NodeI.cpp @@ -154,19 +154,23 @@ NodeI::createSession( Ice::checkNotNull(subscriber, __FILE__, __LINE__, current); Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current); + auto instance = _instance.lock(); + if (!instance) + { + // Ignore the Node is being destroyed. + return; + } + shared_ptr session; try { NodePrx s = *subscriber; if (fromRelay) { - // - // If the call is from a relay, we check if we already have a connection to this node - // and eventually re-use it. Otherwise, we'll try to establish a connection to the node - // if it has endpoints. If it doesn't, we'll re-use the current connection to send the - // confirmation. - // - s = getNodeWithExistingConnection(s, current.con); + // If the call is from a relay, we check if we already have a connection to this node and eventually re-use + // it. Otherwise, we'll try to establish a connection to the node if it has endpoints. If it doesn't, we'll + // re-use the current connection to send the confirmation. + s = getNodeWithExistingConnection(std::move(instance), s, current.con); } else if (current.con) { @@ -262,9 +266,16 @@ NodeI::createSubscriberSession( const Ice::ConnectionPtr& connection, const shared_ptr& session) { + auto instance = _instance.lock(); + if (!instance) + { + // Ignore the Node is being destroyed. + return; + } + try { - subscriber = getNodeWithExistingConnection(subscriber, connection); + subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, connection); auto self = shared_from_this(); #if defined(__GNUC__) @@ -297,9 +308,16 @@ NodeI::createSubscriberSession( void NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, shared_ptr session) { + auto instance = _instance.lock(); + if (!instance) + { + // Ignore the Node is being destroyed. + return; + } + try { - auto p = getNodeWithExistingConnection(publisher, con); + auto p = getNodeWithExistingConnection(std::move(instance), publisher, con); unique_lock lock(_mutex); if (!session) @@ -516,18 +534,14 @@ NodeI::forwardToPublishers(const Ice::ByteSeq& inParams, const Ice::Current& cur } NodePrx -NodeI::getNodeWithExistingConnection(NodePrx node, const Ice::ConnectionPtr& con) +NodeI::getNodeWithExistingConnection(const shared_ptr& instance, NodePrx node, const Ice::ConnectionPtr& con) { Ice::ConnectionPtr connection; // If the node has a session with this node, use a bi-dir proxy associated with node session's connection. - auto instance = _instance.lock(); - if (instance) + if (auto nodeSession = instance->getNodeSessionManager()->getSession(node->ice_getIdentity())) { - if (auto nodeSession = instance->getNodeSessionManager()->getSession(node->ice_getIdentity())) - { - connection = nodeSession->getConnection(); - } + connection = nodeSession->getConnection(); } // Otherwise, check if the node already has a session established and use the connection from the session. diff --git a/cpp/src/DataStorm/NodeI.h b/cpp/src/DataStorm/NodeI.h index 394a37e4e65..40eda295998 100644 --- a/cpp/src/DataStorm/NodeI.h +++ b/cpp/src/DataStorm/NodeI.h @@ -66,7 +66,10 @@ namespace DataStormI std::shared_ptr getSession(const Ice::Identity&) const; - DataStormContract::NodePrx getNodeWithExistingConnection(DataStormContract::NodePrx, const Ice::ConnectionPtr&); + DataStormContract::NodePrx getNodeWithExistingConnection( + const std::shared_ptr& instance, + DataStormContract::NodePrx node, + const Ice::ConnectionPtr& connection); DataStormContract::NodePrx getProxy() const { return _proxy; } diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 4fe17231aa0..7b28e4676aa 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -55,11 +55,8 @@ SessionI::init() { _id = Ice::identityToString(_proxy->ice_getIdentity()); - // - // Even though the node register a default servant for sessions, we still need to - // register the session servant explicitly here to ensure collocation works. The - // default servant from the node is used for facet calls. - // + // Even though the node register a default servant for sessions, we still need to register the session servant + // explicitly here to ensure collocation works. The default servant from the node is used for facet calls. _instance->getObjectAdapter()->add( make_shared(shared_from_this(), _instance->getCallbackExecutor()), _proxy->ice_getIdentity()); @@ -74,11 +71,8 @@ SessionI::init() void SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&) { - // - // Retain topics outside the synchronization. This is necessary to ensure the topic destructor - // doesn't get called within the synchronization. The topic destructor can callback on the - // session to disconnect. - // + // Retain topics outside the synchronization. This is necessary to ensure the topic destructor doesn't get called + // within the synchronization. The topic destructor can callback on the session to disconnect. vector> retained; { lock_guard lock(_mutex); @@ -100,6 +94,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&) retained, [&](const shared_ptr& topic) { + // Topic attach will subscribe to the new reader or writer using the session proxy. It does nothing + // for existing readers or writers. for (auto id : info.ids) { topic->attach(id, shared_from_this(), *_session); @@ -127,11 +123,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&) void SessionI::attachTopic(TopicSpec spec, const Ice::Current&) { - // - // Retain topics outside the synchronization. This is necessary to ensure the topic destructor - // doesn't get called within the synchronization. The topic destructor can callback on the - // session to disconnect. - // + // Retain topics outside the synchronization. This is necessary to ensure the topic destructor doesn't get called + // within the synchronization. The topic destructor can callback on the session to disconnect. vector> retained; { lock_guard lock(_mutex); diff --git a/cpp/src/DataStorm/SessionI.h b/cpp/src/DataStorm/SessionI.h index e797ef51a11..19b950435ff 100644 --- a/cpp/src/DataStorm/SessionI.h +++ b/cpp/src/DataStorm/SessionI.h @@ -86,7 +86,7 @@ namespace DataStormI { return &p->second; } - return 0; + return nullptr; } bool reap(int sessionInstanceId) @@ -139,7 +139,7 @@ namespace DataStormI auto p = _elements.find(id); if (p == _elements.end()) { - return 0; + return nullptr; } return &p->second; } diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index 33dbb82e862..4a96bb142ee 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -329,6 +329,7 @@ TopicI::attachElements( filter = _keyFilterFactories->decode(_instance->getCommunicator(), spec.name, spec.value); } } + for (auto e : p->second) { ElementDataAckSeq acks; @@ -343,6 +344,7 @@ TopicI::attachElements( e->attach(topicId, spec.id, key, filter, session, prx, data, now, acks); } } + if (!acks.empty()) { specs.push_back( @@ -457,6 +459,7 @@ TopicI::attachElementsAck( { initCb = e->attach(topicId, spec.id, key, filter, session, prx, data, now, samples); } + if (initCb) { initCallbacks.push_back(initCb); @@ -465,6 +468,7 @@ TopicI::attachElementsAck( break; } } + if (!found) { removedIds.push_back(data.peerId); @@ -540,11 +544,8 @@ TopicI::attachElementsAck( } } - // - // Initialize samples on data elements once all the elements have been attached. This is - // important for the priority configuration in case 2 writers with different priorities are - // attached from the same session. - // + // Initialize samples on data elements once all the elements have been attached. This is important for the priority + // configuration in case 2 writers with different priorities are attached from the same session. for (auto initCb : initCallbacks) { initCb(); diff --git a/cpp/test/DataStorm/reliability/Writer.cpp b/cpp/test/DataStorm/reliability/Writer.cpp index a0f1f9e80f1..85b2ac95bf6 100644 --- a/cpp/test/DataStorm/reliability/Writer.cpp +++ b/cpp/test/DataStorm/reliability/Writer.cpp @@ -1,8 +1,6 @@ -// ********************************************************************** // // Copyright (c) ZeroC, Inc. All rights reserved. // -// ********************************************************************** #include "DataStorm/DataStorm.h" #include "TestHelper.h"