Skip to content

Commit

Permalink
Upgrade DataStorm to use a Default OA for OutgoingConnections
Browse files Browse the repository at this point in the history
  • Loading branch information
pepone committed Jan 14, 2025
1 parent 88f1b26 commit 2e06b67
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 98 deletions.
4 changes: 2 additions & 2 deletions cpp/include/Ice/Communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ namespace Ice
* @return The object adapter associated by default with new outgoing connections.
* @see Connection::getAdapter
*/
[[nodiscard]] ObjectAdapterPtr getDefaultObjectAdapter() const noexcept;
[[nodiscard]] ObjectAdapterPtr getDefaultObjectAdapter() const;

/**
* Sets the object adapter that will be associated with new outgoing connections created by this
* communicator. This function has no effect on existing outgoing connections, or on incoming connections.
* @param adapter The object adapter to associate with new outgoing connections.
* @see Connection::setAdapter
*/
void setDefaultObjectAdapter(ObjectAdapterPtr adapter) noexcept;
void setDefaultObjectAdapter(ObjectAdapterPtr adapter);

/**
* Get the implicit context associated with this communicator.
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/DataStorm/Instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ using namespace Ice;
Instance::Instance(CommunicatorPtr communicator, function<void(function<void()> call)> customExecutor)
: _communicator(std::move(communicator))
{
if (_communicator->getDefaultObjectAdapter() != nullptr)
{
throw invalid_argument(
"communicator used to initialize a DataStorm node must not have a default object adapter");
}

PropertiesPtr properties = _communicator->getProperties();

if (properties->getIcePropertyAsInt("DataStorm.Node.Server.Enabled") > 0)
Expand Down Expand Up @@ -46,6 +52,7 @@ Instance::Instance(CommunicatorPtr communicator, function<void(function<void()>
{
_adapter = _communicator->createObjectAdapter("");
}
_communicator->setDefaultObjectAdapter(_adapter);

if (properties->getIcePropertyAsInt("DataStorm.Node.Multicast.Enabled") > 0)
{
Expand Down Expand Up @@ -179,6 +186,7 @@ Instance::destroy(bool ownsCommunicator)
timer->destroy();
}

_communicator->setDefaultObjectAdapter(nullptr);
if (ownsCommunicator)
{
_communicator->destroy();
Expand Down
86 changes: 18 additions & 68 deletions cpp/src/DataStorm/NodeI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,13 @@ NodeI::createSession(

unique_lock<mutex> lock(_mutex);
session = createPublisherSessionServant(*subscriber);
if (!session || session->checkSession())
if (!session)
{
return; // Shutting down.
}
else if (session->checkSession())
{
return; // Shutting down or already connected
return; // Already connected.
}

s->ice_getConnectionAsync(
Expand All @@ -193,12 +197,8 @@ NodeI::createSession(

if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
// Use a fixed proxy to ensure the request is sent using the connection registered by connected
// with the connection manager.
s = s->ice_fixed(connection);
subscriberSession = subscriberSession->ice_fixed(connection);
}
Expand Down Expand Up @@ -281,28 +281,10 @@ NodeI::createSubscriberSession(
{
subscriber = getNodeWithExistingConnection(instance, subscriber, subscriberConnection);

subscriber->ice_getConnectionAsync(
[self = shared_from_this(), instance, session, subscriber](const auto& connection)
{
auto s = subscriber;
if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
s = s->ice_fixed(connection);
}

s->initiateCreateSessionAsync(
self->_proxy,
nullptr,
[self, session, subscriber](exception_ptr ex)
{ self->removePublisherSession(subscriber, session, ex); });
},
[subscriber, session, self = shared_from_this()](exception_ptr ex)
subscriber->initiateCreateSessionAsync(
_proxy,
nullptr,
[self = shared_from_this(), session, subscriber](exception_ptr ex)
{ self->removePublisherSession(subscriber, session, ex); });
}
catch (const LocalException&)
Expand Down Expand Up @@ -338,40 +320,12 @@ NodeI::createPublisherSession(
}
}

p->ice_getConnectionAsync(
[self = shared_from_this(), instance, session, publisher, p](const auto& connection) mutable
{
if (session->checkSession())
{
return; // Already connected.
}

if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

// Use a fixed proxy to ensure the request is sent using the connection configured with the OA.
p = p->ice_fixed(connection);
}

try
{
p->createSessionAsync(
self->_proxy,
uncheckedCast<SubscriberSessionPrx>(session->getProxy()),
false,
nullptr,
[=](exception_ptr ex) { self->removeSubscriberSession(publisher, session, ex); });
}
catch (const LocalException&)
{
self->removeSubscriberSession(publisher, session, current_exception());
}
},
[publisher, session, self = shared_from_this()](exception_ptr ex)
p->createSessionAsync(
_proxy,
uncheckedCast<SubscriberSessionPrx>(session->getProxy()),
false,
nullptr,
[self = shared_from_this(), publisher, session](exception_ptr ex)
{ self->removeSubscriberSession(publisher, session, ex); });
}
catch (const LocalException&)
Expand Down Expand Up @@ -608,10 +562,6 @@ NodeI::getNodeWithExistingConnection(

if (connection)
{
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}
return node->ice_fixed(connection);
}

Expand Down
30 changes: 6 additions & 24 deletions cpp/src/DataStorm/NodeSessionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,31 +339,13 @@ NodeSessionManager::connect(const LookupPrx& lookup, const NodePrx& proxy)

try
{
lookup->ice_getConnectionAsync(
[self = shared_from_this(), lookup, proxy, instance](const auto& connection) mutable
lookup->createSessionAsync(
proxy,
[self = shared_from_this(), lookup](optional<NodePrx> node)
{
// Ensure that the connection is setup to dispatch requests before creating the session.
if (!connection->getAdapter())
{
connection->setAdapter(instance->getObjectAdapter());
}

auto l = lookup->ice_fixed(connection);
l->createSessionAsync(
proxy,
[self, l](optional<NodePrx> node)
{
// createSession must return a non null proxy.
if (node)
{
self->connected(*node, l);
}
else
{
self->disconnected(l);
}
},
[self, l](exception_ptr) { self->disconnected(l); });
// createSession must return a non null proxy.
assert(node);
self->connected(*node, lookup);
},
[self = shared_from_this(), lookup](exception_ptr) { self->disconnected(lookup); });
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/Ice/Communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,13 @@ Ice::Communicator::createObjectAdapterWithRouter(string name, RouterPrx router)
}

ObjectAdapterPtr
Ice::Communicator::getDefaultObjectAdapter() const noexcept
Ice::Communicator::getDefaultObjectAdapter() const
{
return _instance->outgoingConnectionFactory()->getDefaultObjectAdapter();
}

void
Ice::Communicator::setDefaultObjectAdapter(ObjectAdapterPtr adapter) noexcept
Ice::Communicator::setDefaultObjectAdapter(ObjectAdapterPtr adapter)
{
return _instance->outgoingConnectionFactory()->setDefaultObjectAdapter(std::move(adapter));
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/test/DataStorm/api/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void ::Writer::run(int argc, char* argv[])
// Communicators shared with DataStorm must have a property set that can use the "DataStorm" opt-in prefix.
Ice::InitializationData initData;
initData.properties = make_shared<Ice::Properties>(vector<string>{"DataStorm"});
Node n2(Ice::initialize(initData));
n2.getCommunicator()->destroy();
Ice::CommunicatorHolder communicatorHolder(Ice::initialize(initData));
Node n2(communicatorHolder.communicator());
}

Ice::InitializationData initData;
Expand Down

0 comments on commit 2e06b67

Please sign in to comment.