Skip to content

Commit

Permalink
More session establishment fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Jan 13, 2025
1 parent 99fb42f commit bc09db2
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ NodeI::createSession(
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
s = s->ice_fixed(connection);
subscriberSession = subscriberSession->ice_fixed(connection);
}

Expand All @@ -210,7 +213,6 @@ NodeI::createSession(
nullptr,
[self, subscriber, session](auto ex)
{ self->removePublisherSession(*subscriber, session, ex); });
assert(!s->ice_getCachedConnection() || s->ice_getCachedConnection() == connection);

// Session::connected informs the subscriber session of all the topic writers in the current node.
session->connected(*subscriberSession, connection, instance->getTopicFactory()->getTopicWriters());
Expand All @@ -220,7 +222,7 @@ NodeI::createSession(
self->removePublisherSession(*subscriber, session, current_exception());
}
},
[self = shared_from_this(), subscriber, session](auto ex)
[self = shared_from_this(), session, subscriber](exception_ptr ex)
{ self->removePublisherSession(*subscriber, session, ex); });
}
catch (const LocalException&)
Expand Down Expand Up @@ -251,7 +253,9 @@ NodeI::confirmCreateSession(
return;
}

if (current.con && publisherSession->ice_getEndpoints().empty() && publisherSession->ice_getAdapterId().empty())
// If publisher session is hosted on a relay, current.con is the connection to that relay. Otherwise this is a
// connection to the publisher node.
if (current.con)
{
publisherSession = publisherSession->ice_fixed(current.con);
}
Expand All @@ -277,18 +281,27 @@ NodeI::createSubscriberSession(
subscriber = getNodeWithExistingConnection(instance, subscriber, subscriberConnection);

subscriber->ice_getConnectionAsync(
[=, self = shared_from_this()](const auto& connection)
[self = shared_from_this(), instance, session, subscriber](const auto& connection)
{
if (connection && !connection->getAdapter())
auto s = subscriber;
if (connection)
{
connection->setAdapter(instance->getObjectAdapter());
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
s = s->ice_fixed(connection);
}
subscriber->initiateCreateSessionAsync(

s->initiateCreateSessionAsync(
self->_proxy,
nullptr,
[=](auto ex) { self->removePublisherSession(subscriber, session, ex); });
[self, session, subscriber](exception_ptr ex)
{ self->removePublisherSession(subscriber, session, ex); });
},
[subscriber, session, self = shared_from_this()](auto ex)
[subscriber, session, self = shared_from_this()](exception_ptr ex)
{ self->removePublisherSession(subscriber, session, ex); });
}
catch (const LocalException&)
Expand Down Expand Up @@ -318,19 +331,29 @@ NodeI::createPublisherSession(
{
return; // Shutting down.
}
else if (session->checkSession())
{
return; // Already connected.
}
}

p->ice_getConnectionAsync(
[=, self = shared_from_this()](const auto& connection)
[self = shared_from_this(), instance, session, publisher, p](const auto& connection) mutable
{
if (session->checkSession())
{
return;
}

if (connection && !connection->getAdapter())
if (connection)
{
connection->setAdapter(instance->getObjectAdapter());
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
p = p->ice_fixed(connection);
}

try
Expand Down

0 comments on commit bc09db2

Please sign in to comment.