Skip to content

Commit

Permalink
DataStorm fixes for configuration, session retries, and more (#3166)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Nov 19, 2024
1 parent d3d0d1f commit f9b8b96
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 171 deletions.
5 changes: 5 additions & 0 deletions config/PropertyNames.xml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@
<property name="Node.Server" class="ObjectAdapter" languages="cpp" />
<property name="Node.Server.Enabled" default="1" languages="cpp" />
<property name="Node.Server.ForwardDiscoveryToMulticast" default="0" languages="cpp" />
<property name="Topic.ClearHistory" default="OnAll" languages="cpp" />
<property name="Topic.DiscardPolicy" default="Never" languages="cpp" />
<property name="Topic.Priority" default="0" languages="cpp" />
<property name="Topic.SampleCount" default="-1" languages="cpp" />
<property name="Topic.SampleLifetime" default="0" languages="cpp" />
<property name="Trace.Data" default="0" languages="cpp" />
<property name="Trace.Session" default="0" languages="cpp" />
<property name="Trace.Topic" default="0" languages="cpp" />
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ NodeI::createSession(
nullptr,
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(
*subscriberSession,
connection,
Expand Down Expand Up @@ -257,6 +259,7 @@ NodeI::confirmCreateSession(
publisherSession = publisherSession->ice_fixed(current.con);
}

// Session::connected informs the publisher session of all the topic readers in the current node.
session->connected(*publisherSession, current.con, getInstance()->getTopicFactory()->getTopicReaders());
}

Expand Down
76 changes: 27 additions & 49 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,27 +512,23 @@ SessionI::disconnected(const Ice::Current& current)
}

void
SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, const TopicInfoSeq& topics)
SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& newConnection, const TopicInfoSeq& topics)
{
lock_guard<mutex> lock(_mutex);
if (_destroyed || _session)
{
assert(_connectedCallbacks.empty());
return;
}

_session = session;
_connection = connection;
if (connection)
_connection = newConnection;
if (newConnection)
{
auto self = shared_from_this();

#if defined(__GNUC__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
// Register a callback with the connection manager to reconnect the session if the connection is closed.
_instance->getConnectionManager()->add(
connection,
newConnection,
self,
[self](auto connection, auto ex)
{
Expand All @@ -544,9 +540,6 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co
}
}
});
#if defined(__GNUC__)
# pragma GCC diagnostic pop
#endif
}

if (_retryTask)
Expand All @@ -571,19 +564,14 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co
{
try
{
// Announce the topics to the peer, don't wait for the result.
_session->announceTopicsAsync(topics, true, nullptr);
}
catch (const Ice::LocalException&)
{
// Ignore
}
}

for (auto c : _connectedCallbacks)
{
c(_proxy);
}
_connectedCallbacks.clear();
}

bool
Expand All @@ -592,6 +580,7 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
lock_guard<mutex> lock(_mutex);
if (_destroyed || (connection && _connection != connection) || !_session)
{
// Ignore we either already destroyed, or disconnected, or a new connection has already been established.
return false;
}

Expand All @@ -605,21 +594,22 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
}
else
{
throw Ice::CloseConnectionException(__FILE__, __LINE__);
throw Ice::CloseConnectionException{__FILE__, __LINE__};
}
}
catch (const std::exception& e)
{
Trace out(_traceLevels, _traceLevels->sessionCat);
out << _id << ": session `" << _session->ice_getIdentity() << "' disconnected:\n";
out << _id << ": session '" << _session->ice_getIdentity() << "' disconnected:\n";
out << (_connection ? _connection->toString() : "<no connection>") << "\n";
out << e.what();
}
}

auto self = shared_from_this();
for (auto& t : _topics)
{
runWithTopics(t.first, [&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); });
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
}

_session = nullopt;
Expand All @@ -635,9 +625,7 @@ SessionI::retry(NodePrx node, exception_ptr exception)

if (exception)
{
//
// Don't retry if we are shutting down.
//
try
{
rethrow_exception(exception);
Expand All @@ -655,20 +643,17 @@ SessionI::retry(NodePrx node, exception_ptr exception)
}
}

if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty())
// Cancel any pending retry task, before we start a new one below.
if (_retryTask)
{
if (_retryTask)
{
_instance->cancelTimerTask(_retryTask);
_retryTask = nullptr;
}
_retryCount = 0;
_instance->cancelTimerTask(_retryTask);
_retryTask = nullptr;
}

//
// If we can't retry connecting to the node because we don't have its endpoints,
// we just wait for the duration of the last retry delay for the peer to reconnect.
// If it doesn't reconnect, we'll destroy this session after the timeout.
//
if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty())
{
// We cannot retry because we don't have the peer endpoints. Wait twice the last retry interval for the peer to
// reconnect.
auto delay = _instance->getRetryDelay(_instance->getRetryCount()) * 2;

if (_traceLevels->session > 0)
Expand All @@ -678,16 +663,15 @@ SessionI::retry(NodePrx node, exception_ptr exception)
<< " (ms) for peer to reconnect";
}

// Schedule a timer to remove the session if the peer doesn't reconnect.
_retryTask = make_shared<IceInternal::InlineTimerTask>([self = shared_from_this()] { self->remove(); });
_instance->scheduleTimerTask(_retryTask, delay);
}
else
{
//
// If we can retry the connection attempt, we schedule a timer to retry. Always
// retry immediately on the first attempt.
//
// Schedule a timer to retry. Always retry immediately on the first attempt.
auto delay = _retryCount == 0 ? 0ms : _instance->getRetryDelay(_retryCount - 1);
// Increment the retry count, it is reset by disconnected.
++_retryCount;

if (_traceLevels->session > 0)
Expand All @@ -703,13 +687,14 @@ SessionI::retry(NodePrx node, exception_ptr exception)
out << _id << ": connection to `" << node->ice_toString()
<< "` failed and the retry limit has been reached";
}

if (exception)
{
try
{
rethrow_exception(exception);
}
catch (const Ice::LocalException& ex)
catch (const std::exception& ex)
{
out << '\n' << ex.what();
}
Expand Down Expand Up @@ -770,21 +755,14 @@ SessionI::destroyImpl(const exception_ptr& ex)
_session = nullopt;
_connection = nullptr;

auto self = shared_from_this();
for (const auto& t : _topics)
{
runWithTopics(
t.first,
[&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); });
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
}
_topics.clear();
}

for (auto c : _connectedCallbacks)
{
c(nullopt);
}
_connectedCallbacks.clear();

try
{
_instance->getObjectAdapter()->remove(_proxy->ice_getIdentity());
Expand Down
1 change: 0 additions & 1 deletion cpp/src/DataStorm/SessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ namespace DataStormI

std::optional<DataStormContract::SessionPrx> _session;
Ice::ConnectionPtr _connection;
std::vector<std::function<void(std::optional<DataStormContract::SessionPrx>)>> _connectedCallbacks;
};

class SubscriberSessionI : public SessionI, public DataStormContract::SubscriberSession
Expand Down
Loading

0 comments on commit f9b8b96

Please sign in to comment.