Skip to content

Commit

Permalink
DataStorm implementation minor fixes (#3162)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Nov 18, 2024
1 parent 51ca2d1 commit 372b2f1
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cpp/include/DataStorm/DataStorm.h
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,7 @@ namespace DataStorm
const Ice::CommunicatorPtr& communicator)
{
Value value;
if(previous)
if (previous)
{
value = Cloner<Value>::clone(
std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/DataStorm/InternalT.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ namespace DataStormI
protected:
friend struct Deleter;

std::shared_ptr<typename V::BaseClassType> getImpl(long long id) const
std::shared_ptr<typename V::BaseClassType> getImpl(std::int64_t id) const
{
std::lock_guard<std::mutex> lock(_mutex);
auto p = _elementsById.find(id);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/DataStorm/CallbackExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace DataStormI
bool _flush;
bool _destroyed;
std::vector<std::function<void()>> _queue;
// An optional executor or null if no o custom executor is provided during Node construction.
// An optional executor or null if no custom executor is provided during Node construction.
std::function<void(std::function<void()> call)> _customExecutor;
};
}
Expand Down
29 changes: 21 additions & 8 deletions cpp/src/DataStorm/Contract.ice
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@

module DataStormContract
{
/// The ClearHistoryPolicy enumeration defines the policy that determines when a reader clears its
/// DataSample history in response to various events.
enum ClearHistoryPolicy
{
/// The reader clears its history when a new DataSample is added.
OnAdd,
/// The reader clears its history when a DataSample is removed.
OnRemove,
/// The reader clears its history when any DataSample event occurs.
OnAll,
/// The reader clears its history when any DataSample event occurs, except for PartialUpdate events.
OnAllExceptPartialUpdate,
/// The reader never clears its history.
Never
}

Expand Down Expand Up @@ -95,13 +102,13 @@ module DataStormContract

/// The topic update tags.
ElementInfoSeq tags;
};
}

struct FilterInfo
{
string name;
Ice::ByteSeq criteria;
};
}

class ElementConfig(1)
{
Expand All @@ -113,7 +120,7 @@ module DataStormContract
optional(10) int sampleCount;
optional(11) int sampleLifetime;
optional(12) ClearHistoryPolicy clearHistory;
};
}

struct ElementData
{
Expand Down Expand Up @@ -193,7 +200,13 @@ module DataStormContract

interface Session
{
/// Called by sessions to announce topics to the peer. A publisher session announces the topics it writes,
/// while a subscriber session announces the topics it reads.
///
/// @param topics The topics to announce.
/// @param initialize currently unused.
void announceTopics(TopicInfoSeq topics, bool initialize);

void attachTopic(TopicSpec topic);
void detachTopic(long topic);

Expand Down Expand Up @@ -230,16 +243,16 @@ module DataStormContract
/// publisher node through a SubscriberSession proxy.
interface Node
{
/// Initiate the creation of a publisher session with a node, after
/// the target node has announced a topic reader for which this node has a corresponding topic writer.
/// Initiate the creation of a publisher session with a node, after the target node has announced a topic
/// reader for which this node has a corresponding topic writer.
///
/// @param publisher The publisher node initiating the session. The proxy is never null.
/// @see Lookup::announceTopicReader
void initiateCreateSession(Node* publisher);

/// Initiate the creation of a subscriber session with a node, after
/// the target node has announced a topic writer for which this node has a corresponding topic reader,
/// or after the node has called Node::initiateCreateSession.
/// Initiate the creation of a subscriber session with a node, after the target node has announced a topic
/// writer for which this node has a corresponding topic reader, or after the node has called
/// Node::initiateCreateSession.
///
/// @param subscriber The subscriber node initiating the session. The proxy is never null.
/// @param session The subscriber session being created. The proxy is never null.
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ DataElementI::attach(
(id < 0 && attachFilter(topicId, data.id, key, sampleFilter, session, prx, facet, id, filter, name, priority)))
{
auto q = data.lastIds.find(_id);
long long lastId = q != data.lastIds.end() ? q->second : 0;
int64_t lastId = q != data.lastIds.end() ? q->second : 0;
samples.push_back(getSamples(key, sampleFilter, data.config, lastId, now));
}

Expand Down Expand Up @@ -210,12 +210,14 @@ DataElementI::attachKey(
_executor->queue([self = shared_from_this(), name]
{ self->_onConnectedElements(DataStorm::CallbackReason::Connect, name); });
}

if (addConnectedKey(key, subscriber))
{
if (key)
{
subscriber->keys.insert(key);
}

if (_traceLevels->data > 1)
{
Trace out(_traceLevels, _traceLevels->dataCat);
Expand Down
46 changes: 30 additions & 16 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,23 @@ NodeI::createSession(
Ice::checkNotNull(subscriber, __FILE__, __LINE__, current);
Ice::checkNotNull(subscriberSession, __FILE__, __LINE__, current);

auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
return;
}

shared_ptr<PublisherSessionI> session;
try
{
NodePrx s = *subscriber;
if (fromRelay)
{
//
// If the call is from a relay, we check if we already have a connection to this node
// and eventually re-use it. Otherwise, we'll try to establish a connection to the node
// if it has endpoints. If it doesn't, we'll re-use the current connection to send the
// confirmation.
//
s = getNodeWithExistingConnection(s, current.con);
// If the call is from a relay, we check if we already have a connection to this node and eventually re-use
// it. Otherwise, we'll try to establish a connection to the node if it has endpoints. If it doesn't, we'll
// re-use the current connection to send the confirmation.
s = getNodeWithExistingConnection(std::move(instance), s, current.con);
}
else if (current.con)
{
Expand Down Expand Up @@ -262,9 +266,16 @@ NodeI::createSubscriberSession(
const Ice::ConnectionPtr& connection,
const shared_ptr<PublisherSessionI>& session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
return;
}

try
{
subscriber = getNodeWithExistingConnection(subscriber, connection);
subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, connection);

auto self = shared_from_this();
#if defined(__GNUC__)
Expand Down Expand Up @@ -297,9 +308,16 @@ NodeI::createSubscriberSession(
void
NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, shared_ptr<SubscriberSessionI> session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
return;
}

try
{
auto p = getNodeWithExistingConnection(publisher, con);
auto p = getNodeWithExistingConnection(std::move(instance), publisher, con);

unique_lock<mutex> lock(_mutex);
if (!session)
Expand Down Expand Up @@ -516,18 +534,14 @@ NodeI::forwardToPublishers(const Ice::ByteSeq& inParams, const Ice::Current& cur
}

NodePrx
NodeI::getNodeWithExistingConnection(NodePrx node, const Ice::ConnectionPtr& con)
NodeI::getNodeWithExistingConnection(const shared_ptr<Instance>& instance, NodePrx node, const Ice::ConnectionPtr& con)
{
Ice::ConnectionPtr connection;

// If the node has a session with this node, use a bi-dir proxy associated with node session's connection.
auto instance = _instance.lock();
if (instance)
if (auto nodeSession = instance->getNodeSessionManager()->getSession(node->ice_getIdentity()))
{
if (auto nodeSession = instance->getNodeSessionManager()->getSession(node->ice_getIdentity()))
{
connection = nodeSession->getConnection();
}
connection = nodeSession->getConnection();
}

// Otherwise, check if the node already has a session established and use the connection from the session.
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/DataStorm/NodeI.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ namespace DataStormI

std::shared_ptr<SessionI> getSession(const Ice::Identity&) const;

DataStormContract::NodePrx getNodeWithExistingConnection(DataStormContract::NodePrx, const Ice::ConnectionPtr&);
DataStormContract::NodePrx getNodeWithExistingConnection(
const std::shared_ptr<Instance>& instance,
DataStormContract::NodePrx node,
const Ice::ConnectionPtr& connection);

DataStormContract::NodePrx getProxy() const { return _proxy; }

Expand Down
23 changes: 8 additions & 15 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,8 @@ SessionI::init()
{
_id = Ice::identityToString(_proxy->ice_getIdentity());

//
// Even though the node register a default servant for sessions, we still need to
// register the session servant explicitly here to ensure collocation works. The
// default servant from the node is used for facet calls.
//
// Even though the node register a default servant for sessions, we still need to register the session servant
// explicitly here to ensure collocation works. The default servant from the node is used for facet calls.
_instance->getObjectAdapter()->add(
make_shared<DispatchInterceptorI>(shared_from_this(), _instance->getCallbackExecutor()),
_proxy->ice_getIdentity());
Expand All @@ -74,11 +71,8 @@ SessionI::init()
void
SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&)
{
//
// Retain topics outside the synchronization. This is necessary to ensure the topic destructor
// doesn't get called within the synchronization. The topic destructor can callback on the
// session to disconnect.
//
// Retain topics outside the synchronization. This is necessary to ensure the topic destructor doesn't get called
// within the synchronization. The topic destructor can callback on the session to disconnect.
vector<shared_ptr<TopicI>> retained;
{
lock_guard<mutex> lock(_mutex);
Expand All @@ -100,6 +94,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&)
retained,
[&](const shared_ptr<TopicI>& topic)
{
// Topic attach will subscribe to the new reader or writer using the session proxy. It does nothing
// for existing readers or writers.
for (auto id : info.ids)
{
topic->attach(id, shared_from_this(), *_session);
Expand Down Expand Up @@ -127,11 +123,8 @@ SessionI::announceTopics(TopicInfoSeq topics, bool, const Ice::Current&)
void
SessionI::attachTopic(TopicSpec spec, const Ice::Current&)
{
//
// Retain topics outside the synchronization. This is necessary to ensure the topic destructor
// doesn't get called within the synchronization. The topic destructor can callback on the
// session to disconnect.
//
// Retain topics outside the synchronization. This is necessary to ensure the topic destructor doesn't get called
// within the synchronization. The topic destructor can callback on the session to disconnect.
vector<shared_ptr<TopicI>> retained;
{
lock_guard<mutex> lock(_mutex);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/DataStorm/SessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace DataStormI
{
return &p->second;
}
return 0;
return nullptr;
}

bool reap(int sessionInstanceId)
Expand Down Expand Up @@ -139,7 +139,7 @@ namespace DataStormI
auto p = _elements.find(id);
if (p == _elements.end())
{
return 0;
return nullptr;
}
return &p->second;
}
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/DataStorm/TopicI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ TopicI::attachElements(
filter = _keyFilterFactories->decode(_instance->getCommunicator(), spec.name, spec.value);
}
}

for (auto e : p->second)
{
ElementDataAckSeq acks;
Expand All @@ -343,6 +344,7 @@ TopicI::attachElements(
e->attach(topicId, spec.id, key, filter, session, prx, data, now, acks);
}
}

if (!acks.empty())
{
specs.push_back(
Expand Down Expand Up @@ -457,6 +459,7 @@ TopicI::attachElementsAck(
{
initCb = e->attach(topicId, spec.id, key, filter, session, prx, data, now, samples);
}

if (initCb)
{
initCallbacks.push_back(initCb);
Expand All @@ -465,6 +468,7 @@ TopicI::attachElementsAck(
break;
}
}

if (!found)
{
removedIds.push_back(data.peerId);
Expand Down Expand Up @@ -540,11 +544,8 @@ TopicI::attachElementsAck(
}
}

//
// Initialize samples on data elements once all the elements have been attached. This is
// important for the priority configuration in case 2 writers with different priorities are
// attached from the same session.
//
// Initialize samples on data elements once all the elements have been attached. This is important for the priority
// configuration in case 2 writers with different priorities are attached from the same session.
for (auto initCb : initCallbacks)
{
initCb();
Expand Down
2 changes: 0 additions & 2 deletions cpp/test/DataStorm/reliability/Writer.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// **********************************************************************
//
// Copyright (c) ZeroC, Inc. All rights reserved.
//
// **********************************************************************

#include "DataStorm/DataStorm.h"
#include "TestHelper.h"
Expand Down

0 comments on commit 372b2f1

Please sign in to comment.