-
Notifications
You must be signed in to change notification settings - Fork 593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataStorm fixes for configuration, session retries, and more #3166
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -512,27 +512,23 @@ SessionI::disconnected(const Ice::Current& current) | |
} | ||
|
||
void | ||
SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, const TopicInfoSeq& topics) | ||
SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& newConnection, const TopicInfoSeq& topics) | ||
{ | ||
lock_guard<mutex> lock(_mutex); | ||
if (_destroyed || _session) | ||
{ | ||
assert(_connectedCallbacks.empty()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed unused |
||
return; | ||
} | ||
|
||
_session = session; | ||
_connection = connection; | ||
if (connection) | ||
_connection = newConnection; | ||
if (newConnection) | ||
{ | ||
auto self = shared_from_this(); | ||
|
||
#if defined(__GNUC__) | ||
# pragma GCC diagnostic push | ||
# pragma GCC diagnostic ignored "-Wshadow" | ||
#endif | ||
// Register a callback with the connection manager to reconnect the session if the connection is closed. | ||
_instance->getConnectionManager()->add( | ||
connection, | ||
newConnection, | ||
self, | ||
[self](auto connection, auto ex) | ||
{ | ||
|
@@ -544,9 +540,6 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co | |
} | ||
} | ||
}); | ||
#if defined(__GNUC__) | ||
# pragma GCC diagnostic pop | ||
#endif | ||
} | ||
|
||
if (_retryTask) | ||
|
@@ -571,19 +564,14 @@ SessionI::connected(SessionPrx session, const Ice::ConnectionPtr& connection, co | |
{ | ||
try | ||
{ | ||
// Announce the topics to the peer, don't wait for the result. | ||
_session->announceTopicsAsync(topics, true, nullptr); | ||
} | ||
catch (const Ice::LocalException&) | ||
{ | ||
// Ignore | ||
} | ||
} | ||
|
||
for (auto c : _connectedCallbacks) | ||
{ | ||
c(_proxy); | ||
} | ||
_connectedCallbacks.clear(); | ||
} | ||
|
||
bool | ||
|
@@ -592,6 +580,7 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) | |
lock_guard<mutex> lock(_mutex); | ||
if (_destroyed || (connection && _connection != connection) || !_session) | ||
{ | ||
// Ignore we either already destroyed, or disconnected, or a new connection has already been established. | ||
return false; | ||
} | ||
|
||
|
@@ -605,21 +594,22 @@ SessionI::disconnected(const Ice::ConnectionPtr& connection, exception_ptr ex) | |
} | ||
else | ||
{ | ||
throw Ice::CloseConnectionException(__FILE__, __LINE__); | ||
throw Ice::CloseConnectionException{__FILE__, __LINE__}; | ||
} | ||
} | ||
catch (const std::exception& e) | ||
{ | ||
Trace out(_traceLevels, _traceLevels->sessionCat); | ||
out << _id << ": session `" << _session->ice_getIdentity() << "' disconnected:\n"; | ||
out << _id << ": session '" << _session->ice_getIdentity() << "' disconnected:\n"; | ||
out << (_connection ? _connection->toString() : "<no connection>") << "\n"; | ||
out << e.what(); | ||
} | ||
} | ||
|
||
auto self = shared_from_this(); | ||
for (auto& t : _topics) | ||
{ | ||
runWithTopics(t.first, [&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); }); | ||
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor change, no need to call shared_from_this in the loop or capture everything by ref. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you declare self in the capture block to avoid an extra copy? |
||
} | ||
|
||
_session = nullopt; | ||
|
@@ -635,9 +625,7 @@ SessionI::retry(NodePrx node, exception_ptr exception) | |
|
||
if (exception) | ||
{ | ||
// | ||
// Don't retry if we are shutting down. | ||
// | ||
try | ||
{ | ||
rethrow_exception(exception); | ||
|
@@ -655,20 +643,17 @@ SessionI::retry(NodePrx node, exception_ptr exception) | |
} | ||
} | ||
|
||
if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty()) | ||
// Cancel any pending retry task, before we start a new one below. | ||
if (_retryTask) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previous code only canceled the retry task when there is no endpoints. Here we are about to schedule a new task I think is clear to cancel any pending task. |
||
{ | ||
if (_retryTask) | ||
{ | ||
_instance->cancelTimerTask(_retryTask); | ||
_retryTask = nullptr; | ||
} | ||
_retryCount = 0; | ||
_instance->cancelTimerTask(_retryTask); | ||
_retryTask = nullptr; | ||
} | ||
|
||
// | ||
// If we can't retry connecting to the node because we don't have its endpoints, | ||
// we just wait for the duration of the last retry delay for the peer to reconnect. | ||
// If it doesn't reconnect, we'll destroy this session after the timeout. | ||
// | ||
if (node->ice_getEndpoints().empty() && node->ice_getAdapterId().empty()) | ||
{ | ||
// We cannot retry because we don't have the peer endpoints. Wait twice the last retry interval for the peer to | ||
// reconnect. | ||
auto delay = _instance->getRetryDelay(_instance->getRetryCount()) * 2; | ||
|
||
if (_traceLevels->session > 0) | ||
|
@@ -678,16 +663,15 @@ SessionI::retry(NodePrx node, exception_ptr exception) | |
<< " (ms) for peer to reconnect"; | ||
} | ||
|
||
// Schedule a timer to remove the session if the peer doesn't reconnect. | ||
_retryTask = make_shared<IceInternal::InlineTimerTask>([self = shared_from_this()] { self->remove(); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here As far as I see we have and endless loop of calling retry to print this message and scheduling the remove call that just calls retry. |
||
_instance->scheduleTimerTask(_retryTask, delay); | ||
} | ||
else | ||
{ | ||
// | ||
// If we can retry the connection attempt, we schedule a timer to retry. Always | ||
// retry immediately on the first attempt. | ||
// | ||
// Schedule a timer to retry. Always retry immediately on the first attempt. | ||
auto delay = _retryCount == 0 ? 0ms : _instance->getRetryDelay(_retryCount - 1); | ||
// Increment the retry count, it is reset by disconnected. | ||
++_retryCount; | ||
|
||
if (_traceLevels->session > 0) | ||
|
@@ -703,13 +687,14 @@ SessionI::retry(NodePrx node, exception_ptr exception) | |
out << _id << ": connection to `" << node->ice_toString() | ||
<< "` failed and the retry limit has been reached"; | ||
} | ||
|
||
if (exception) | ||
{ | ||
try | ||
{ | ||
rethrow_exception(exception); | ||
} | ||
catch (const Ice::LocalException& ex) | ||
catch (const std::exception& ex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to match what we do above, when we handle what exceptions to retry. |
||
{ | ||
out << '\n' << ex.what(); | ||
} | ||
|
@@ -770,21 +755,14 @@ SessionI::destroyImpl(const exception_ptr& ex) | |
_session = nullopt; | ||
_connection = nullptr; | ||
|
||
auto self = shared_from_this(); | ||
for (const auto& t : _topics) | ||
{ | ||
runWithTopics( | ||
t.first, | ||
[&](TopicI* topic, TopicSubscriber&) { topic->detach(t.first, shared_from_this()); }); | ||
runWithTopics(t.first, [id = t.first, self](TopicI* topic, TopicSubscriber&) { topic->detach(id, self); }); | ||
} | ||
_topics.clear(); | ||
} | ||
|
||
for (auto c : _connectedCallbacks) | ||
{ | ||
c(nullopt); | ||
} | ||
_connectedCallbacks.clear(); | ||
|
||
try | ||
{ | ||
_instance->getObjectAdapter()->remove(_proxy->ice_getIdentity()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic properties where not added before, but they are public and documented in
https://doc.zeroc.com/datastorm/latest/property-reference/datastorm-topic
I updated the docs to include
-1
default for SampleCount this was missing in the docs.