Skip to content

Commit

Permalink
DataStorm lambda capture fixes (#3174)
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone authored Nov 25, 2024
1 parent 8f6f111 commit be1ca06
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 68 deletions.
3 changes: 2 additions & 1 deletion cpp/src/DataStorm/DataElementI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ DataElementI::attach(
session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this());
if (!samplesI.empty())
{
return [=, this]() { initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
return [=, self = shared_from_this()]()
{ self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); };
}
return nullptr;
}
Expand Down
67 changes: 31 additions & 36 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,47 +184,47 @@ NodeI::createSession(
return; // Shutting down or already connected
}

auto self = shared_from_this();
s->ice_getConnectionAsync(
[=, this](auto connection) mutable
[=, self = shared_from_this()](auto connection) mutable
{
if (session->checkSession())
{
return;
}

if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
}

if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(self->getInstance()->getObjectAdapter());
}
subscriberSession = subscriberSession->ice_fixed(connection);
}

try
{
// Must be called before connected
s->confirmCreateSessionAsync(
_proxy,
self->_proxy,
session->getProxy<PublisherSessionPrx>(),
nullptr,
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
[self, subscriber, session](auto ex)
{ self->removePublisherSession(*subscriber, session, ex); });
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(
*subscriberSession,
connection,
getInstance()->getTopicFactory()->getTopicWriters());
self->getInstance()->getTopicFactory()->getTopicWriters());
}
catch (const Ice::LocalException&)
{
removePublisherSession(*subscriber, session, current_exception());
self->removePublisherSession(*subscriber, session, current_exception());
}
},
[=](auto ex) { self->removePublisherSession(*subscriber, session, ex); });
[self = shared_from_this(), subscriber, session](auto ex)
{ self->removePublisherSession(*subscriber, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand Down Expand Up @@ -266,41 +266,33 @@ NodeI::confirmCreateSession(
void
NodeI::createSubscriberSession(
NodePrx subscriber,
const Ice::ConnectionPtr& connection,
const Ice::ConnectionPtr& subscriberConnection,
const shared_ptr<PublisherSessionI>& session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
// Ignore the Node is being shutdown.
return;
}

try
{
subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, connection);
subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, subscriberConnection);

auto self = shared_from_this();
#if defined(__GNUC__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
subscriber->ice_getConnectionAsync(
[=, this](auto connection)
[=, self = shared_from_this()](auto connection)
{
if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
connection->setAdapter(self->getInstance()->getObjectAdapter());
}
subscriber->initiateCreateSessionAsync(
_proxy,
self->_proxy,
nullptr,
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
},
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
#if defined(__GNUC__)
# pragma GCC diagnostic pop
#endif
[=, self = shared_from_this()](auto ex) { self->removePublisherSession(subscriber, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand All @@ -309,18 +301,21 @@ NodeI::createSubscriberSession(
}

void
NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, shared_ptr<SubscriberSessionI> session)
NodeI::createPublisherSession(
NodePrx publisher,
const Ice::ConnectionPtr& publisherConnection,
shared_ptr<SubscriberSessionI> session)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being destroyed.
// Ignore the Node is being shutdown.
return;
}

try
{
auto p = getNodeWithExistingConnection(std::move(instance), publisher, con);
auto p = getNodeWithExistingConnection(std::move(instance), publisher, publisherConnection);

unique_lock<mutex> lock(_mutex);
if (!session)
Expand All @@ -332,9 +327,8 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con,
}
}

auto self = shared_from_this();
p->ice_getConnectionAsync(
[=, this](auto connection)
[=, self = shared_from_this()](auto connection)
{
if (session->checkSession())
{
Expand All @@ -343,24 +337,25 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con,

if (connection && !connection->getAdapter())
{
connection->setAdapter(getInstance()->getObjectAdapter());
connection->setAdapter(self->getInstance()->getObjectAdapter());
}

try
{
p->createSessionAsync(
_proxy,
self->_proxy,
session->getProxy<SubscriberSessionPrx>(),
false,
nullptr,
[=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); });
}
catch (const Ice::LocalException&)
{
removeSubscriberSession(publisher, session, current_exception());
self->removeSubscriberSession(publisher, session, current_exception());
}
},
[=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); });
[=, self = shared_from_this()](exception_ptr ex)
{ self->removeSubscriberSession(publisher, session, ex); });
}
catch (const Ice::LocalException&)
{
Expand Down
32 changes: 15 additions & 17 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ NodeSessionManager::createOrGet(NodePrx node, const Ice::ConnectionPtr& connecti
session->init();
_sessions.emplace(node->ice_getIdentity(), session);

// TODO we should review this code, to avoid using the proxy shared_ptr as a map key.
// Specially the connection manager doesn't use this proxy for lookup.
// Register a callback with the connection manager to destroy the session when the connection is closed.
instance->getConnectionManager()->add(
connection,
make_shared<NodePrx>(node),
Expand Down Expand Up @@ -306,6 +305,13 @@ NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& cu
void
NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy)
{
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}

try
{
lookup->createSessionAsync(
Expand All @@ -326,6 +332,10 @@ NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy)
[=, self = shared_from_this()](std::exception_ptr) { self->disconnected(lookup); });
}
catch (const Ice::CommunicatorDestroyedException&)
{
// Ignore node is being shutdown.
}
catch (const std::exception&)
{
disconnected(lookup);
}
Expand All @@ -338,6 +348,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup)
auto instance = _instance.lock();
if (!instance)
{
// Ignore the Node is being shutdown.
return;
}

Expand Down Expand Up @@ -376,6 +387,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup)
}
catch (const Ice::CommunicatorDestroyedException&)
{
// Ignore node is being shutdown.
}
}
}
Expand Down Expand Up @@ -407,21 +419,7 @@ NodeSessionManager::disconnected(LookupPrx lookup)
if (instance)
{
instance->scheduleTimerTask(
[=, self = shared_from_this()]
{
#if defined(__GNUC__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wshadow"
#endif
auto instance = self->_instance.lock();
if (instance)
{
self->connect(lookup, self->_nodePrx);
}
#if defined(__GNUC__)
# pragma GCC diagnostic pop
#endif
},
[=, self = shared_from_this()] { self->connect(lookup, self->_nodePrx); },
instance->getRetryDelay(_retryCount++));
}
}
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/DataStorm/SessionI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,12 +789,10 @@ SessionI::checkSession()
{
if (_connection)
{
//
// Make sure the connection is still established. It's possible that the connection got closed
// and we're not notified yet by the connection manager. Check session explicitly check for the
// connection to make sure that if we get a session creation request from a peer (which might
// detect the connection closure before), it doesn't get ignored.
//
// Make sure the connection is still established. It's possible that the connection got closed and we
// were not notified yet by the connection manager. Check session explicitly check for the connection
// to make sure that if we get a session creation request from a peer (which might detect the connection
// closure before), it doesn't get ignored.
try
{
_connection->throwException();
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/DataStorm/TopicFactoryI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ TopicFactoryI::createTopicReader(
catch (const Ice::ObjectAdapterDestroyedException&)
{
}
catch (const std::exception&)
{
assert(false);
}
return reader;
}

Expand Down Expand Up @@ -127,10 +123,6 @@ TopicFactoryI::createTopicWriter(
catch (const Ice::ObjectAdapterDestroyedException&)
{
}
catch (const std::exception&)
{
assert(false);
}

return writer;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/DataStorm/TopicI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ TopicI::TopicI(
_instance(_factory.lock()->getInstance()),
_traceLevels(_instance->getTraceLevels()),
_id(id),
// The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by
// the instance that owns it and is removed in destroy implementation.
_forwarder{_instance->getCollocatedForwarder()->add<SessionPrx>(
[this](Ice::ByteSeq inParams, const Ice::Current& current) { forward(inParams, current); })},
_destroyed(false),
Expand Down

0 comments on commit be1ca06

Please sign in to comment.