Skip to content

Commit

Permalink
feat(rust): enable mptcp support between nodes and inside portals
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Feb 18, 2025
1 parent feeaabf commit 188fc47
Show file tree
Hide file tree
Showing 61 changed files with 466 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use ockam::identity::{
SecureChannelOptions, TrustMultiIdentifiersPolicy,
};
use ockam::remote::RemoteRelayOptions;
use ockam::tcp::{TcpOutletOptions, TcpTransportExtension};
use ockam::tcp::{TcpOutletOptions, TcpTransportExtension, TCP};
use ockam::transport::HostnamePort;
use ockam::{node, Context, Result};
use ockam_api::authenticator::enrollment_tokens::TokenAcceptor;
Expand Down Expand Up @@ -82,6 +82,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// Create a credential retriever that will be used to obtain credentials
let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new(
node.context().try_clone()?,
TCP,
tcp.clone(),
node.secure_channels(),
RemoteCredentialRetrieverInfo::create_for_project_member(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use ockam::identity::{
TrustMultiIdentifiersPolicy,
};
use ockam::node;
use ockam::tcp::{TcpInletOptions, TcpTransportExtension, TCP};
use ockam::{route, Context, Result};
use ockam_api::authenticator::enrollment_tokens::TokenAcceptor;
use ockam_api::authenticator::one_time_code::OneTimeCode;
Expand All @@ -13,7 +14,6 @@ use ockam_api::{RemoteMultiaddrResolver, TransportRouteResolver};
use ockam_core::compat::sync::Arc;
use ockam_core::TryClone;
use ockam_multiaddr::MultiAddr;
use ockam_transport_tcp::{TcpInletOptions, TcpTransportExtension};

/// This node supports an "edge" server which can connect to a "control" node
/// in order to connect its TCP inlet to the "control" node TCP outlet
Expand Down Expand Up @@ -80,6 +80,7 @@ async fn start_node(ctx: Context, project_information_path: &str, token: OneTime
// Create a credential retriever that will be used to obtain credentials
let credential_retriever = Arc::new(RemoteCredentialRetrieverCreator::new(
node.context().try_clone()?,
TCP,
tcp.clone(),
node.secure_channels(),
RemoteCredentialRetrieverInfo::create_for_project_member(
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub mod tcp {
pub use ockam_transport_tcp::{
TcpConnection, TcpConnectionMode, TcpConnectionOptions, TcpInletOptions, TcpListener,
TcpListenerInfo, TcpListenerOptions, TcpOutletOptions, TcpSenderInfo, TcpTransport,
TcpTransportExtension, MAX_MESSAGE_SIZE, TCP,
TcpTransportExtension, MAX_MESSAGE_SIZE, MPTCP, TCP,
};
}
#[cfg(feature = "ockam_transport_udp")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ async fn handle_tcp_inlet_create(
tls_certificate_provider,
false,
false,
false,
)
.await;
match result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async fn handle_tcp_outlet_create(
priviledged,
false,
false,
false,
)
.await;

Expand Down
7 changes: 6 additions & 1 deletion implementations/rust/ockam/ockam_api/src/influxdb/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl NodeManagerWorker {
tls,
skip_handshake,
enable_nagle,
enable_mptcp,
} = body.tcp_outlet;
let address = self
.node_manager
Expand Down Expand Up @@ -100,6 +101,7 @@ impl NodeManagerWorker {
privileged,
skip_handshake,
enable_nagle,
enable_mptcp,
)
.await
{
Expand Down Expand Up @@ -128,6 +130,7 @@ impl NodeManagerWorker {
tls_certificate_provider,
skip_handshake,
enable_nagle,
enable_mptcp,
} = body.tcp_inlet.clone();

//TODO: should be an easier way to tweak the multiaddr
Expand Down Expand Up @@ -198,6 +201,7 @@ impl NodeManagerWorker {
tls_certificate_provider,
skip_handshake,
enable_nagle,
enable_mptcp,
)
.await
{
Expand Down Expand Up @@ -344,7 +348,7 @@ impl InfluxDBPortals for BackgroundNodeClient {
influxdb_config: InfluxDBOutletConfig,
) -> miette::Result<OutletStatus> {
let mut outlet_payload =
CreateOutlet::new(to, tls, from.cloned(), true, false, false, false);
CreateOutlet::new(to, tls, from.cloned(), true, false, false, false, false);
if let Some(policy_expression) = policy_expression {
outlet_payload.set_policy_expression(policy_expression);
}
Expand Down Expand Up @@ -388,6 +392,7 @@ impl InfluxDBPortals for BackgroundNodeClient {
tls_certificate_provider,
false,
false,
false,
);
let payload = CreateInfluxDBInlet::new(inlet_payload, lease_usage, lease_issuer_route);
Request::post("/node/influxdb_inlet").body(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl KafkaInletController {
None,
false,
false,
false,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl KafkaOutletController {
false,
false,
false,
false,
)
.await
.map(|info| info.to)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use ockam::identity::{get_default_timeout, Identifier, SecureClient, TrustIdenti
use ockam_core::env::{get_env_with_default, FromString};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use ockam_transport_tcp::TcpTransport;
use ockam_transport_core::TransportImpl;
use ockam_transport_tcp::{TcpTransport, TCP};
use std::fmt::{Debug, Display, Formatter};
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
Expand Down Expand Up @@ -521,7 +522,10 @@ async fn make_secure_client(
Ok(SecureClient::new(
secure_channels,
None,
TcpTransport::get_or_create(ctx)?,
TransportImpl {
t_type: TCP,
transport: TcpTransport::get_or_create(ctx)?,
},
project_route,
Arc::new(TrustIdentifierPolicy::new(identifier)),
&default_node.identifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ pub(crate) mod tests {
};
use ockam_core::{route, Address};
use ockam_node::{Context, NodeBuilder};
use ockam_transport_core::TransportImpl;
use ockam_transport_tcp::{TcpListenerOptions, TcpTransport, TCP};
use opentelemetry::trace::{SpanContext, SpanId, SpanKind};
use std::sync::Arc;
Expand Down Expand Up @@ -214,7 +215,10 @@ pub(crate) mod tests {
Ok(SecureClient::new(
secure_channels,
None,
tcp_transport,
TransportImpl {
t_type: TCP,
transport: tcp_transport,
},
route,
Arc::new(TrustEveryonePolicy),
&client_identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ockam::udp::{UdpBind, UdpBindArguments, UdpBindOptions, UdpTransport};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::flow_control::FlowControlId;
use ockam_core::{Address, Error, Result, Route, LOCAL};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Secure, Service, Tcp, Udp, Worker};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Mptcp, Secure, Service, Tcp, Udp, Worker};
use ockam_multiaddr::{MultiAddr, ProtoIter, Protocol};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
Expand Down Expand Up @@ -163,19 +163,23 @@ impl RemoteMultiaddrResolver {
tcp: &TcpTransport,
ma: &MultiAddr,
peer: String,
enable_mptcp: bool,
) -> Result<TcpConnection> {
tcp.connect(peer, TcpConnectionOptions::new())
.await
.map_err(|err| {
Error::new(
Origin::Api,
Kind::Io,
format!(
"Couldn't make TCP connection while resolving multiaddr: {}. Err: {}",
ma, err
),
)
})
tcp.connect(
peer,
TcpConnectionOptions::new().set_enable_mptcp(enable_mptcp),
)
.await
.map_err(|err| {
Error::new(
Origin::Api,
Kind::Io,
format!(
"Couldn't make TCP connection while resolving multiaddr: {}. Err: {}",
ma, err
),
)
})
}

async fn connect_udp(&self, udp: &UdpTransport, ma: &MultiAddr, peer: &str) -> Result<UdpBind> {
Expand Down Expand Up @@ -210,7 +214,16 @@ impl RemoteMultiaddrResolver {
) -> Result<RemoteMultiaddrResolverConnection> {
let next = it.next().ok_or_else(|| invalid_multiaddr_error(ma))?;

if let Some(port) = next.cast::<Tcp>() {
#[allow(clippy::manual_map)]
let tcp_info = if let Some(port) = next.cast::<Tcp>() {
Some((*port, false))
} else if let Some(port) = next.cast::<Mptcp>() {
Some((*port, true))
} else {
None
};

if let Some((port, enable_mptcp)) = tcp_info {
let tcp = self.tcp.as_ref().ok_or_else(|| {
Error::new(
Origin::Api,
Expand All @@ -219,8 +232,8 @@ impl RemoteMultiaddrResolver {
)
})?;

let peer = format!("{}:{}", peer, *port);
let connection = self.connect_tcp(tcp, ma, peer).await?;
let peer = format!("{}:{}", peer, port);
let connection = self.connect_tcp(tcp, ma, peer, enable_mptcp).await?;

return Ok(RemoteMultiaddrResolverConnection::Tcp(connection));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::net::{SocketAddrV4, SocketAddrV6};

use crate::multiaddr_resolver::{invalid_multiaddr_error, multiple_transport_hops_error};
use ockam::tcp::TCP;
use ockam::tcp::{MPTCP, TCP};
use ockam::udp::UDP;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{Address, Error, Result, Route, TransportType, LOCAL};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Secure, Service, Tcp, Udp, Worker};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Mptcp, Secure, Service, Tcp, Udp, Worker};
use ockam_multiaddr::{MultiAddr, ProtoIter, ProtoValue, Protocol};

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -156,7 +156,16 @@ impl TransportRouteResolver {
}

fn parse_port(&self, ma: &MultiAddr, next: &ProtoValue) -> Result<(TransportType, u16)> {
if let Some(port) = next.cast::<Tcp>() {
#[allow(clippy::manual_map)]
let tcp_info = if let Some(port) = next.cast::<Tcp>() {
Some((*port, TCP))
} else if let Some(port) = next.cast::<Mptcp>() {
Some((*port, MPTCP))
} else {
None
};

if let Some((tcp_port, transport_type)) = tcp_info {
if !self.allow_tcp {
return Err(Error::new(
Origin::Api,
Expand All @@ -165,7 +174,7 @@ impl TransportRouteResolver {
));
}

return Ok((TCP, port.0));
return Ok((transport_type, tcp_port));
}

if let Some(port) = next.cast::<Udp>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{RemoteMultiaddrResolver, RemoteMultiaddrResolverConnection, ReverseL

use crate::nodes::NodeManager;
use ockam_core::{async_trait, Error, Route};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Tcp};
use ockam_multiaddr::proto::{DnsAddr, Ip4, Ip6, Mptcp, Tcp};
use ockam_multiaddr::{Match, MultiAddr, Protocol};
use ockam_node::Context;

Expand All @@ -23,7 +23,7 @@ impl Instantiator for PlainTcpInstantiator {
vec![
// matches any tcp address followed by a tcp protocol
Match::any([DnsAddr::CODE, Ip4::CODE, Ip6::CODE]),
Tcp::CODE.into(),
Match::any([Tcp::CODE, Mptcp::CODE]),
]
}

Expand Down
13 changes: 12 additions & 1 deletion implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct CreateInlet {
#[n(14)] pub(crate) skip_handshake: bool,
/// Enable Nagle's algorithm for potentially higher throughput, but higher latency
#[n(15)] pub(crate) enable_nagle: bool,
/// Enable MPTCP
#[n(16)] pub enable_mptcp: bool,
}

impl Encodable for CreateInlet {
Expand All @@ -89,6 +91,7 @@ impl CreateInlet {
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
enable_mptcp: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -105,6 +108,7 @@ impl CreateInlet {
tls_certificate_provider: None,
skip_handshake,
enable_nagle,
enable_mptcp,
}
}

Expand All @@ -120,6 +124,7 @@ impl CreateInlet {
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
enable_mptcp: bool,
) -> Self {
Self {
listen_addr: listen,
Expand All @@ -136,6 +141,7 @@ impl CreateInlet {
tls_certificate_provider: None,
skip_handshake,
enable_nagle,
enable_mptcp,
}
}

Expand Down Expand Up @@ -199,7 +205,9 @@ pub struct CreateOutlet {
/// Skip Portal handshake for lower latency, but also lower throughput
#[n(7)] pub skip_handshake: bool,
/// Enable Nagle's algorithm for potentially higher throughput, but higher latency
#[n(8)] pub(crate) enable_nagle: bool,
#[n(8)] pub enable_nagle: bool,
/// Enable MPTCP
#[n(9)] pub enable_mptcp: bool,
}

impl Encodable for CreateOutlet {
Expand All @@ -215,6 +223,7 @@ impl Decodable for CreateOutlet {
}

impl CreateOutlet {
#[allow(clippy::too_many_arguments)]
pub fn new(
hostname_port: HostnamePort,
tls: bool,
Expand All @@ -223,6 +232,7 @@ impl CreateOutlet {
privileged: bool,
skip_handshake: bool,
enable_nagle: bool,
enable_mptcp: bool,
) -> Self {
Self {
hostname_port,
Expand All @@ -233,6 +243,7 @@ impl CreateOutlet {
privileged,
skip_handshake,
enable_nagle,
enable_mptcp,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ impl InMemoryNode {
None,
false,
false,
false,
)
.await?;

Expand Down Expand Up @@ -328,6 +329,7 @@ impl InMemoryNode {
false,
false,
false,
false,
)
.await?;

Expand Down
Loading

0 comments on commit 188fc47

Please sign in to comment.