Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataStorm fixes for configuration, session retries, and more #3166

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/PropertyNames.xml
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@
<property name="Node.Server" class="ObjectAdapter" languages="cpp" />
<property name="Node.Server.Enabled" default="1" languages="cpp" />
<property name="Node.Server.ForwardDiscoveryToMulticast" default="0" languages="cpp" />
<property name="Topic.ClearHistory" default="OnAll" languages="cpp" />
<property name="Topic.DiscardPolicy" default="Never" languages="cpp" />
<property name="Topic.Priority" default="0" languages="cpp" />
<property name="Topic.SampleCount" default="-1" languages="cpp" />
<property name="Topic.SampleLifetime" default="0" languages="cpp" />
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic properties where not added before, but they are public and documented in

https://doc.zeroc.com/datastorm/latest/property-reference/datastorm-topic

I updated the docs to include -1 default for SampleCount this was missing in the docs.

<property name="Trace.Data" default="0" languages="cpp" />
<property name="Trace.Session" default="0" languages="cpp" />
<property name="Trace.Topic" default="0" languages="cpp" />
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ NodeI::createSession(
nullptr,
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(
*subscriberSession,
connection,
Expand Down Expand Up @@ -257,6 +259,7 @@ NodeI::confirmCreateSession(
publisherSession = publisherSession->ice_fixed(current.con);
}

// Session::connected informs the publisher session of all the topic readers in the current node.
session->connected(*publisherSession, current.con, getInstance()->getTopicFactory()->getTopicReaders());
}

Expand Down
61 changes: 23 additions & 38 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,6 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co
lock_guard<mutex> lock(_mutex);
if (_destroyed || _session)
{
assert(_connectedCallbacks.empty());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed unused _connectedCallbacks we never add any callbacks.

return;
}

Expand All @@ -531,6 +530,7 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
// Register a callback with the connection manager to reconnect the session if the connection is closed.
_instance->getConnectionManager()->add(
connection,
self,
Expand Down Expand Up @@ -571,19 +571,14 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co
{
try
{
// Announce the topics to the peer, don't wait for the result.
_session->announceTopicsAsync(topics, true, nullptr);
}
catch (const Ice::LocalException&)
{
// Ignore
}
}

for (auto c : _connectedCallbacks)
{
c(_proxy);
}
_connectedCallbacks.clear();
}

bool
Expand All @@ -592,6 +587,7 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
lock_guard<mutex> lock(_mutex);
if (_destroyed || (connection && _connection != connection) || !_session)
{
// Ignore we either already destroyed, or disconnected, or a new connection has already been established.
return false;
}

Expand All @@ -605,21 +601,22 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex)
}
else
{
throw Ice::CloseConnectionException(__FILE__, __LINE__);
throw Ice::CloseConnectionException{__FILE__, __LINE__};
}
}
catch (const std::exception& e)
{
Trace out(_traceLevels, _traceLevels->sessionCat);
out << _id << ": session `" << _session->ice_getIdentity() << "' disconnected:\n";
out << _id << ": session '" << _session->ice_getIdentity() << "' disconnected:\n";
out << (_connection ? _connection->toString() : "<no connection>") << "\n";
out << e.what();
}
}

auto self = shared_from_this();
for (auto& t : _topics)
{
runWithTopics(t.first, [&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); });
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor change, no need to call shared_from_this in the loop or capture everything by ref.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you declare self in the capture block to avoid an extra copy?

}

_session = nullopt;
Expand All @@ -635,9 +632,7 @@ SessionI::retry(NodePrx node, exception_ptr exception)

if (exception)
{
//
// Don't retry if we are shutting down.
//
try
{
rethrow_exception(exception);
Expand All @@ -655,20 +650,17 @@ SessionI::retry(NodePrx node, exception_ptr exception)
}
}

if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty())
// Cancel any pending retry task, before we start a new one below.
if (_retryTask)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous code only canceled the retry task when there is no endpoints. Here we are about to schedule a new task I think is clear to cancel any pending task.

{
if (_retryTask)
{
_instance->cancelTimerTask(_retryTask);
_retryTask = nullptr;
}
_retryCount = 0;
_instance->cancelTimerTask(_retryTask);
_retryTask = nullptr;
}

//
// If we can't retry connecting to the node because we don't have its endpoints,
// we just wait for the duration of the last retry delay for the peer to reconnect.
// If it doesn't reconnect, we'll destroy this session after the timeout.
//
if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty())
{
// We cannot retry because we don't have the peer endpoints. Wait twice the last retry interval for the peer to
// reconnect.
auto delay = _instance->getRetryDelay(_instance->getRetryCount()) * 2;

if (_traceLevels->session > 0)
Expand All @@ -678,16 +670,15 @@ SessionI::retry(NodePrx node, exception_ptr exception)
<< " (ms) for peer to reconnect";
}

// Schedule a timer to remove the session if the peer doesn't reconnect.
_retryTask = make_shared<IceInternal::InlineTimerTask>([self = shared_from_this()] { self->remove(); });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here SessionI::remove would call retry again, and we schedule a new call to remove.

As far as I see we have and endless loop of calling retry to print this message and scheduling the remove call that just calls retry.

_instance->scheduleTimerTask(_retryTask, delay);
}
else
{
//
// If we can retry the connection attempt, we schedule a timer to retry. Always
// retry immediately on the first attempt.
//
// Schedule a timer to retry. Always retry immediately on the first attempt.
auto delay = _retryCount == 0 ? 0ms : _instance->getRetryDelay(_retryCount - 1);
// Increment the retry count, it is reset by disconnected.
++_retryCount;

if (_traceLevels->session > 0)
Expand All @@ -703,13 +694,14 @@ SessionI::retry(NodePrx node, exception_ptr exception)
out << _id << ": connection to `" << node->ice_toString()
<< "` failed and the retry limit has been reached";
}

if (exception)
{
try
{
rethrow_exception(exception);
}
catch (const Ice::LocalException& ex)
catch (const std::exception& ex)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to match what we do above, when we handle what exceptions to retry.

{
out << '\n' << ex.what();
}
Expand Down Expand Up @@ -770,21 +762,14 @@ SessionI::destroyImpl(const exception_ptr& ex)
_session = nullopt;
_connection = nullptr;

auto self = shared_from_this();
for (const auto& t : _topics)
{
runWithTopics(
t.first,
[&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); });
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); });
}
_topics.clear();
}

for (auto c : _connectedCallbacks)
{
c(nullopt);
}
_connectedCallbacks.clear();

try
{
_instance->getObjectAdapter()->remove(_proxy->ice_getIdentity());
Expand Down
1 change: 0 additions & 1 deletion cpp/src/DataStorm/SessionI.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ namespace DataStormI

std::optional<DataStormContract::SessionPrx> _session;
Ice::ConnectionPtr _connection;
std::vector<std::function<void(std::optional<DataStormContract::SessionPrx>)>> _connectedCallbacks;
};

class SubscriberSessionI : public SessionI, public DataStormContract::SubscriberSession
Expand Down
Loading
Loading