From be1ca06bdb4cc85d7626e9ebb30772260034c25c Mon Sep 17 00:00:00 2001 From: Jose Date: Mon, 25 Nov 2024 11:03:55 +0100 Subject: [PATCH] DataStorm lambda capture fixes (#3174) --- cpp/src/DataStorm/DataElementI.cpp | 3 +- cpp/src/DataStorm/NodeI.cpp | 67 +++++++++++------------- cpp/src/DataStorm/NodeSessionManager.cpp | 32 ++++++----- cpp/src/DataStorm/SessionI.cpp | 10 ++-- cpp/src/DataStorm/TopicFactoryI.cpp | 8 --- cpp/src/DataStorm/TopicI.cpp | 2 + 6 files changed, 54 insertions(+), 68 deletions(-) diff --git a/cpp/src/DataStorm/DataElementI.cpp b/cpp/src/DataStorm/DataElementI.cpp index b8058b7e217..fa88d300e93 100644 --- a/cpp/src/DataStorm/DataElementI.cpp +++ b/cpp/src/DataStorm/DataElementI.cpp @@ -178,7 +178,8 @@ DataElementI::attach( session->subscriberInitialized(topicId, id > 0 ? data.id : -data.id, data.samples, key, shared_from_this()); if (!samplesI.empty()) { - return [=, this]() { initSamples(samplesI, topicId, data.id, priority, now, id < 0); }; + return [=, self = shared_from_this()]() + { self->initSamples(samplesI, topicId, data.id, priority, now, id < 0); }; } return nullptr; } diff --git a/cpp/src/DataStorm/NodeI.cpp b/cpp/src/DataStorm/NodeI.cpp index da8cf37d9fd..c528b40acf6 100644 --- a/cpp/src/DataStorm/NodeI.cpp +++ b/cpp/src/DataStorm/NodeI.cpp @@ -184,22 +184,20 @@ NodeI::createSession( return; // Shutting down or already connected } - auto self = shared_from_this(); s->ice_getConnectionAsync( - [=, this](auto connection) mutable + [=, self = shared_from_this()](auto connection) mutable { if (session->checkSession()) { return; } - if (connection && !connection->getAdapter()) - { - connection->setAdapter(getInstance()->getObjectAdapter()); - } - if (connection) { + if (!connection->getAdapter()) + { + connection->setAdapter(self->getInstance()->getObjectAdapter()); + } subscriberSession = subscriberSession->ice_fixed(connection); } @@ -207,24 +205,26 @@ NodeI::createSession( { // Must be called before connected s->confirmCreateSessionAsync( - _proxy, + self->_proxy, session->getProxy(), nullptr, - [=](auto ex) { self->removePublisherSession(*subscriber, session, ex); }); + [self, subscriber, session](auto ex) + { self->removePublisherSession(*subscriber, session, ex); }); assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection); // Session::connected informs the subscriber session of all the topic writers in the current node. session->connected( *subscriberSession, connection, - getInstance()->getTopicFactory()->getTopicWriters()); + self->getInstance()->getTopicFactory()->getTopicWriters()); } catch (const Ice::LocalException&) { - removePublisherSession(*subscriber, session, current_exception()); + self->removePublisherSession(*subscriber, session, current_exception()); } }, - [=](auto ex) { self->removePublisherSession(*subscriber, session, ex); }); + [self = shared_from_this(), subscriber, session](auto ex) + { self->removePublisherSession(*subscriber, session, ex); }); } catch (const Ice::LocalException&) { @@ -266,41 +266,33 @@ NodeI::confirmCreateSession( void NodeI::createSubscriberSession( NodePrx subscriber, - const Ice::ConnectionPtr& connection, + const Ice::ConnectionPtr& subscriberConnection, const shared_ptr& session) { auto instance = _instance.lock(); if (!instance) { - // Ignore the Node is being destroyed. + // Ignore the Node is being shutdown. return; } try { - subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, connection); + subscriber = getNodeWithExistingConnection(std::move(instance), subscriber, subscriberConnection); - auto self = shared_from_this(); -#if defined(__GNUC__) -# pragma GCC diagnostic push -# pragma GCC diagnostic ignored "-Wshadow" -#endif subscriber->ice_getConnectionAsync( - [=, this](auto connection) + [=, self = shared_from_this()](auto connection) { if (connection && !connection->getAdapter()) { - connection->setAdapter(getInstance()->getObjectAdapter()); + connection->setAdapter(self->getInstance()->getObjectAdapter()); } subscriber->initiateCreateSessionAsync( - _proxy, + self->_proxy, nullptr, [=](auto ex) { self->removePublisherSession(subscriber, session, ex); }); }, - [=](auto ex) { self->removePublisherSession(subscriber, session, ex); }); -#if defined(__GNUC__) -# pragma GCC diagnostic pop -#endif + [=, self = shared_from_this()](auto ex) { self->removePublisherSession(subscriber, session, ex); }); } catch (const Ice::LocalException&) { @@ -309,18 +301,21 @@ NodeI::createSubscriberSession( } void -NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, shared_ptr session) +NodeI::createPublisherSession( + NodePrx publisher, + const Ice::ConnectionPtr& publisherConnection, + shared_ptr session) { auto instance = _instance.lock(); if (!instance) { - // Ignore the Node is being destroyed. + // Ignore the Node is being shutdown. return; } try { - auto p = getNodeWithExistingConnection(std::move(instance), publisher, con); + auto p = getNodeWithExistingConnection(std::move(instance), publisher, publisherConnection); unique_lock lock(_mutex); if (!session) @@ -332,9 +327,8 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, } } - auto self = shared_from_this(); p->ice_getConnectionAsync( - [=, this](auto connection) + [=, self = shared_from_this()](auto connection) { if (session->checkSession()) { @@ -343,13 +337,13 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, if (connection && !connection->getAdapter()) { - connection->setAdapter(getInstance()->getObjectAdapter()); + connection->setAdapter(self->getInstance()->getObjectAdapter()); } try { p->createSessionAsync( - _proxy, + self->_proxy, session->getProxy(), false, nullptr, @@ -357,10 +351,11 @@ NodeI::createPublisherSession(NodePrx publisher, const Ice::ConnectionPtr& con, } catch (const Ice::LocalException&) { - removeSubscriberSession(publisher, session, current_exception()); + self->removeSubscriberSession(publisher, session, current_exception()); } }, - [=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); }); + [=, self = shared_from_this()](exception_ptr ex) + { self->removeSubscriberSession(publisher, session, ex); }); } catch (const Ice::LocalException&) { diff --git a/cpp/src/DataStorm/NodeSessionManager.cpp b/cpp/src/DataStorm/NodeSessionManager.cpp index dd84cc3d50c..3f1cd19ec9e 100644 --- a/cpp/src/DataStorm/NodeSessionManager.cpp +++ b/cpp/src/DataStorm/NodeSessionManager.cpp @@ -112,8 +112,7 @@ NodeSessionManager::createOrGet(NodePrx node, const Ice::ConnectionPtr& connecti session->init(); _sessions.emplace(node->ice_getIdentity(), session); - // TODO we should review this code, to avoid using the proxy shared_ptr as a map key. - // Specially the connection manager doesn't use this proxy for lookup. + // Register a callback with the connection manager to destroy the session when the connection is closed. instance->getConnectionManager()->add( connection, make_shared(node), @@ -306,6 +305,13 @@ NodeSessionManager::forward(const Ice::ByteSeq& inParams, const Ice::Current& cu void NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy) { + auto instance = _instance.lock(); + if (!instance) + { + // Ignore the Node is being shutdown. + return; + } + try { lookup->createSessionAsync( @@ -326,6 +332,10 @@ NodeSessionManager::connect(LookupPrx lookup, NodePrx proxy) [=, self = shared_from_this()](std::exception_ptr) { self->disconnected(lookup); }); } catch (const Ice::CommunicatorDestroyedException&) + { + // Ignore node is being shutdown. + } + catch (const std::exception&) { disconnected(lookup); } @@ -338,6 +348,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup) auto instance = _instance.lock(); if (!instance) { + // Ignore the Node is being shutdown. return; } @@ -376,6 +387,7 @@ NodeSessionManager::connected(NodePrx node, LookupPrx lookup) } catch (const Ice::CommunicatorDestroyedException&) { + // Ignore node is being shutdown. } } } @@ -407,21 +419,7 @@ NodeSessionManager::disconnected(LookupPrx lookup) if (instance) { instance->scheduleTimerTask( - [=, self = shared_from_this()] - { -#if defined(__GNUC__) -# pragma GCC diagnostic push -# pragma GCC diagnostic ignored "-Wshadow" -#endif - auto instance = self->_instance.lock(); - if (instance) - { - self->connect(lookup, self->_nodePrx); - } -#if defined(__GNUC__) -# pragma GCC diagnostic pop -#endif - }, + [=, self = shared_from_this()] { self->connect(lookup, self->_nodePrx); }, instance->getRetryDelay(_retryCount++)); } } diff --git a/cpp/src/DataStorm/SessionI.cpp b/cpp/src/DataStorm/SessionI.cpp index 8a0bd5274b2..03a12b12744 100644 --- a/cpp/src/DataStorm/SessionI.cpp +++ b/cpp/src/DataStorm/SessionI.cpp @@ -789,12 +789,10 @@ SessionI::checkSession() { if (_connection) { - // - // Make sure the connection is still established. It's possible that the connection got closed - // and we're not notified yet by the connection manager. Check session explicitly check for the - // connection to make sure that if we get a session creation request from a peer (which might - // detect the connection closure before), it doesn't get ignored. - // + // Make sure the connection is still established. It's possible that the connection got closed and we + // were not notified yet by the connection manager. Check session explicitly check for the connection + // to make sure that if we get a session creation request from a peer (which might detect the connection + // closure before), it doesn't get ignored. try { _connection->throwException(); diff --git a/cpp/src/DataStorm/TopicFactoryI.cpp b/cpp/src/DataStorm/TopicFactoryI.cpp index d81432552c7..efb9a432d75 100644 --- a/cpp/src/DataStorm/TopicFactoryI.cpp +++ b/cpp/src/DataStorm/TopicFactoryI.cpp @@ -70,10 +70,6 @@ TopicFactoryI::createTopicReader( catch (const Ice::ObjectAdapterDestroyedException&) { } - catch (const std::exception&) - { - assert(false); - } return reader; } @@ -127,10 +123,6 @@ TopicFactoryI::createTopicWriter( catch (const Ice::ObjectAdapterDestroyedException&) { } - catch (const std::exception&) - { - assert(false); - } return writer; } diff --git a/cpp/src/DataStorm/TopicI.cpp b/cpp/src/DataStorm/TopicI.cpp index 723e5e49488..fd8b598e608 100644 --- a/cpp/src/DataStorm/TopicI.cpp +++ b/cpp/src/DataStorm/TopicI.cpp @@ -108,6 +108,8 @@ TopicI::TopicI( _instance(_factory.lock()->getInstance()), _traceLevels(_instance->getTraceLevels()), _id(id), + // The collocated forwarder is initalized here to avoid using a nullable proxy. The forwarder is only used by + // the instance that owns it and is removed in destroy implementation. _forwarder{_instance->getCollocatedForwarder()->add( [this](Ice::ByteSeq inParams, const Ice::Current& current) { forward(inParams, current); })}, _destroyed(false),