Skip to content

Commit

Permalink
feat(rust): added support for multiple concurrent routes to tcp inlets
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Mar 4, 2025
1 parent b3cd57a commit 1087b99
Show file tree
Hide file tree
Showing 48 changed files with 2,236 additions and 1,185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,21 @@ impl<T: TcpPortalsRepository> TcpPortalsRepository for AutoRetry<T> {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TcpInlet {
bind_addr: SocketAddr,
outlet_addr: MultiAddr,
outlet_addresses: Vec<MultiAddr>,
alias: String,
privileged: bool,
}

impl TcpInlet {
pub fn new(
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
alias: &str,
privileged: bool,
) -> TcpInlet {
Self {
bind_addr: *bind_addr,
outlet_addr: outlet_addr.clone(),
outlet_addresses: outlet_addresses.to_owned(),
alias: alias.to_string(),
privileged,
}
Expand All @@ -96,12 +96,12 @@ impl TcpInlet {
self.bind_addr
}

pub fn outlet_addr(&self) -> MultiAddr {
self.outlet_addr.clone()
pub fn outlet_addr(&self) -> &Vec<MultiAddr> {
&self.outlet_addresses
}

pub fn alias(&self) -> String {
self.alias.clone()
pub fn alias(&self) -> &str {
&self.alias
}

pub fn privileged(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use itertools::Itertools;
use sqlx::*;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use sqlx::*;
use tracing::debug;

use crate::cli_state::storage::tcp_portals_repository::TcpPortalsRepository;
Expand Down Expand Up @@ -63,7 +63,13 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
)
.bind(node_name)
.bind(tcp_inlet.bind_addr().to_string())
.bind(tcp_inlet.outlet_addr().to_string())
.bind(
tcp_inlet
.outlet_addr()
.iter()
.map(|x| x.to_string())
.join("//"),
)
.bind(tcp_inlet.alias())
.bind(tcp_inlet.privileged());
query.execute(&*self.database.pool).await.void()?;
Expand Down Expand Up @@ -158,18 +164,26 @@ struct TcpInletRow {
impl TcpInletRow {
fn bind_addr(&self) -> Result<SocketAddr> {
SocketAddr::from_str(&self.bind_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
}

fn outlet_addr(&self) -> Result<MultiAddr> {
MultiAddr::from_str(&self.outlet_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> {
let mut multiaddresses = Vec::new();

for addr in self.outlet_addr.split("//") {
multiaddresses.push(
MultiAddr::from_str(addr)
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))?,
);
}

Ok(multiaddresses)
}

fn tcp_inlet(&self) -> Result<TcpInlet> {
Ok(TcpInlet::new(
&self.bind_addr()?,
&self.outlet_addr()?,
&self.outlet_addresses()?,
&self.alias,
self.privileged.to_bool(),
))
Expand Down Expand Up @@ -212,7 +226,7 @@ mod tests {

let tcp_inlet = TcpInlet::new(
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
&MultiAddr::from_str("/node/outlet").unwrap(),
&["/node/outlet1".parse()?, "/node/outlet2".parse()?],
"alias",
true,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ impl CliState {
&self,
node_name: &str,
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
alias: &str,
privileged: bool,
) -> Result<TcpInlet> {
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addresses, alias, privileged);
self.tcp_portals_repository()
.store_tcp_inlet(node_name, &tcp_inlet)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use ockam_core::Route;
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use std::sync::Arc;
use std::time::Duration;

impl HttpControlNodeApiBackend {
pub(super) async fn handle_tcp_inlet(
Expand All @@ -35,7 +36,7 @@ impl HttpControlNodeApiBackend {
},
Method::DELETE => match resource_id {
None => ControlApiHttpResponse::missing_resource_id(ResourceKind::TcpInlets),
Some(id) => handle_tcp_inlet_delete(&self.node_manager, id).await,
Some(id) => handle_tcp_inlet_delete(context, &self.node_manager, id).await,
},
_ => {
warn!("Invalid method: {method}");
Expand Down Expand Up @@ -153,16 +154,30 @@ async fn handle_tcp_inlet_create(
)?),
};

if request.to.is_empty() {
return ControlApiHttpResponse::bad_request("`to` must not be empty");
}

let to = {
let mut to = Vec::new();
for address in request.to.iter() {
to.push(address.parse()?);
}
to
};

let result = node_manager
.create_inlet(
context,
request.from.try_into()?,
Route::default(),
Route::default(),
request.to.parse()?,
request.target_redundancy.unwrap_or(to.len() - 1),
to,
request.name.unwrap_or_else(random_string),
allow,
None,
Some(Duration::from_millis(request.ping_timeout)),
authorized,
false,
None,
Expand Down Expand Up @@ -286,10 +301,11 @@ async fn handle_tcp_inlet_list(
)
)]
async fn handle_tcp_inlet_delete(
context: &Context,
node_manager: &Arc<NodeManager>,
resource_id: &str,
) -> Result<ControlApiHttpResponse, ControlApiError> {
let result = node_manager.delete_inlet(resource_id).await;
let result = node_manager.delete_inlet(context, resource_id).await;
match result {
Ok(_) => Ok(ControlApiHttpResponse::without_body(
StatusCode::NO_CONTENT,
Expand Down Expand Up @@ -362,11 +378,13 @@ mod test {
hostname: "127.0.0.1".to_string(),
port: 0,
},
to: "/service/outlet".to_string(),
to: vec!["/service/outlet".to_string()],
target_redundancy: None,
identity: None,
authorized: None,
allow: None,
retry_wait: 1000,
retry_wait: 1_000,
ping_timeout: 1_000,
})
.unwrap(),
),
Expand All @@ -384,8 +402,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Down);
assert_eq!(inlet_status.current_route, None);
assert_eq!(inlet_status.to, "/service/outlet");
assert!(inlet_status.active_routes.is_empty());
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
assert_eq!(inlet_status.bind_address.hostname, "127.0.0.1");
assert!(inlet_status.bind_address.port > 0);

Expand All @@ -408,8 +426,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Up);
assert_eq!(inlet_status.current_route, Some("0#outlet".to_string()));
assert_eq!(inlet_status.to, "/service/outlet");
assert_eq!(inlet_status.active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlet_status.to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "GET".to_string(),
Expand All @@ -429,8 +447,8 @@ mod test {
assert_eq!(inlets.len(), 1);
assert_eq!(inlets[0].name, "inlet-name");
assert_eq!(inlets[0].status, ConnectionStatus::Up);
assert_eq!(inlets[0].current_route, Some("0#outlet".to_string()));
assert_eq!(inlets[0].to, "/service/outlet");
assert_eq!(inlets[0].active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlets[0].to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "DELETE".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 {
20000
}

fn ping_timeout_default() -> u64 {
10_000
}

#[derive(Debug, Serialize, Deserialize, Default, ToSchema)]
#[serde(rename_all = "kebab-case")]
pub enum InletKind {
Expand Down Expand Up @@ -65,9 +69,13 @@ pub struct CreateInletRequest {
#[serde(default = "tcp_inlet_default_bind_address")]
#[schema(default = tcp_inlet_default_bind_address)]
pub from: HostnamePort,
/// Multiaddress to a TCP Outlet
/// Multiaddresses to a TCP Outlet
#[schema(example = "/project/default/service/forward_to_node1/secure/api/service/outlet")]
pub to: String,
pub to: Vec<String>,
/// Target redundancy for the TCP Inlet routes; 0 means only one route is instantiated
/// When omitted, the number of provided Multiaddresses minus one applies
#[serde(default)]
pub target_redundancy: Option<usize>,
/// Identity to be used to create the secure channel;
/// When omitted, the node's identity will be used
pub identity: Option<String>,
Expand All @@ -84,6 +92,11 @@ pub struct CreateInletRequest {
#[serde(default = "retry_wait_default")]
#[schema(default = retry_wait_default)]
pub retry_wait: u64,
/// How long until the outlet route is considered disconnected;
/// In milliseconds
#[serde(default = "ping_timeout_default")]
#[schema(default = ping_timeout_default)]
pub ping_timeout: u64,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -103,24 +116,33 @@ pub struct InletStatus {
pub status: ConnectionStatus,
/// Bind address of the TCP Inlet
pub bind_address: HostnamePort,
/// The current route of the TCP Inlet, populated only when the status is `up`
pub current_route: Option<String>,
/// Multiaddress to the TCP Outlet
pub to: String,
/// The active route of the TCP Inlet, empty when the connection is down
pub active_routes: Vec<String>,
/// The number of target redundant routes, 0 means only one route is instantiated
pub target_redundancy: usize,
/// Multiaddresses to the TCP Outlet
pub to: Vec<String>,
}

impl TryFrom<crate::nodes::models::portal::InletStatus> for InletStatus {
impl TryFrom<crate::nodes::models::portal::InletStatusView> for InletStatus {
type Error = ockam_core::Error;

fn try_from(status: crate::nodes::models::portal::InletStatus) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_addr.as_str())?;
fn try_from(
status: crate::nodes::models::portal::InletStatusView,
) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_address.as_str())?;

Ok(InletStatus {
status: status.status.into(),
status: status.connection.into(),
bind_address,
name: status.alias,
current_route: status.outlet_route.map(|r| r.to_string()),
to: status.outlet_addr,
active_routes: status
.outlet_routes
.into_iter()
.map(|r| r.to_string())
.collect(),
target_redundancy: status.target_redundancy,
to: status.outlet_addresses,
})
}
}
Loading

0 comments on commit 1087b99

Please sign in to comment.