Skip to content

Commit

Permalink
Refs #22056: Backport TCP features
Browse files Browse the repository at this point in the history
---------

Signed-off-by: cferreiragonz <[email protected]>
  • Loading branch information
cferreiragonz committed Nov 18, 2024
1 parent 5616d90 commit e646cce
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 10 deletions.
3 changes: 0 additions & 3 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ bool SecurityManager::create_participant_stateless_message_reader()
ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList;
}
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.matched_writers_allocation = pattr.allocation.participants;
Expand Down Expand Up @@ -1274,7 +1273,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer()
watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
watt.endpoint.security_attributes().is_submessage_protected = true;
watt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down Expand Up @@ -1327,7 +1325,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader()
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.security_attributes().is_submessage_protected = true;
ratt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down
78 changes: 74 additions & 4 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void TCPTransportInterface::clean()

{
std::vector<std::shared_ptr<TCPChannelResource>> channels;
std::vector<eprosima::fastdds::rtps::Locator> delete_channels;

{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -200,10 +201,22 @@ void TCPTransportInterface::clean()

for (auto& channel : channel_resources_)
{
channels.push_back(channel.second);
if (std::find(channels.begin(), channels.end(), channel.second) == channels.end())
{
channels.push_back(channel.second);
}
else
{
delete_channels.push_back(channel.first);
}
}
}

for (auto& delete_channel : delete_channels)
{
channel_resources_.erase(delete_channel);
}

for (auto& channel : channels)
{
if (channel->connection_established())
Expand Down Expand Up @@ -279,7 +292,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator(
return locator;
}

void TCPTransportInterface::bind_socket(
ResponseCode TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -288,7 +301,29 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);
channel_resources_[channel->locator()] = channel;

ResponseCode ret = RETCODE_OK;
const auto insert_ret = channel_resources_.insert(
decltype(channel_resources_)::value_type{channel->locator(), channel});
if (false == insert_ret.second)
{
// There is an existing channel that can be used. Force the Client to close unnecessary socket
ret = RETCODE_SERVER_ERROR;
}

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(channel->locator(), local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(channel->locator());
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel});
}
}
return ret;
}

bool TCPTransportInterface::check_crc(
Expand Down Expand Up @@ -923,7 +958,7 @@ bool TCPTransportInterface::CreateInitialConnect(
std::lock_guard<std::mutex> socketsLock(sockets_map_mutex_);

// We try to find a SenderResource that has this locator.
// Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport
// Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport
// already reuses a SenderResource.
for (auto& sender_resource : send_resource_list)
{
Expand Down Expand Up @@ -987,6 +1022,19 @@ bool TCPTransportInterface::CreateInitialConnect(
send_resource_list.emplace_back(
static_cast<SenderResource*>(new TCPSenderResource(*this, physical_locator)));

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(physical_locator, local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(physical_locator);
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_[local_locator] = channel;
}
}

return true;
}

Expand Down Expand Up @@ -2079,6 +2127,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports(
}
}

void TCPTransportInterface::is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const
{
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
get_ips(local_interfaces, true, false);
for (const auto& interface_it : local_interfaces)
{
if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name))
{
locNames = local_interfaces;
// Remove interface of original locator from the list
locNames.erase(std::remove_if(locNames.begin(), locNames.end(),
[&interface_it](const fastdds::rtps::IPFinder::info_IP& locInterface)
{
return locInterface.locator == interface_it.locator;
}), locNames.end());
break;
}
}
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
12 changes: 11 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TCPTransportInterface : public TransportInterface
virtual ~TCPTransportInterface();

//! Stores the binding between the given locator and the given TCP socket. Server side.
void bind_socket(
ResponseCode bind_socket(
std::shared_ptr<TCPChannelResource>&);

//! Removes the listening socket for the specified port.
Expand Down Expand Up @@ -525,6 +525,16 @@ class TCPTransportInterface : public TransportInterface
*/
void send_channel_pending_logical_ports(
std::shared_ptr<TCPChannelResource>& channel);

/**
* Method to check if a locator contains an interface that belongs to the same host.
* If it occurs, @c locNames will be updated with the list of interfaces of the host.
* @param [in] locator Locator to check.
* @param [in,out] locNames Vector of interfaces to be updated.
*/
void is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest(

if (RETCODE_OK == code)
{
mTransport->bind_socket(channel);
code = mTransport->bind_socket(channel);
}

sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code);
Expand Down
10 changes: 9 additions & 1 deletion test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2001,6 +2001,9 @@ TEST_F(TCPv4Tests, autofill_port)
// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's
// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two
// clients will generate at least two entries in the server's channel resources map.
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
{
TCPv4TransportDescriptor recvDescriptor;
Expand Down Expand Up @@ -2032,7 +2035,12 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2u);
std::set<std::shared_ptr<TCPChannelResource>> channels_created;
for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources())
{
channels_created.insert(channel_resource.second);
}
ASSERT_EQ(channels_created.size(), 2u);
}

#ifndef _WIN32
Expand Down
4 changes: 4 additions & 0 deletions tools/fds/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ int fastdds_discovery_server(
// Retrieve first TCP port
option::Option* pO_tcp_port = options[TCP_PORT];

bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr);

/**
* A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the
* port number set in the CLI. This locator must be used:
Expand All @@ -318,6 +320,7 @@ int fastdds_discovery_server(
// Add default locator in cases a) and b)
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear();
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4);
udp_server_initialized = true;
}
else if (nullptr == pOp && nullptr != pO_port)
{
Expand Down Expand Up @@ -569,6 +572,7 @@ int fastdds_discovery_server(
}

fastdds::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix;
participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr;

// Create the server
int return_value = 0;
Expand Down

0 comments on commit e646cce

Please sign in to comment.