Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): added support for multiple concurrent routes to tcp inlets #8832

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,21 @@ impl<T: TcpPortalsRepository> TcpPortalsRepository for AutoRetry<T> {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TcpInlet {
bind_addr: SocketAddr,
outlet_addr: MultiAddr,
outlet_addresses: Vec<MultiAddr>,
alias: String,
privileged: bool,
}

impl TcpInlet {
pub fn new(
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
alias: &str,
privileged: bool,
) -> TcpInlet {
Self {
bind_addr: *bind_addr,
outlet_addr: outlet_addr.clone(),
outlet_addresses: outlet_addresses.to_owned(),
alias: alias.to_string(),
privileged,
}
Expand All @@ -96,12 +96,12 @@ impl TcpInlet {
self.bind_addr
}

pub fn outlet_addr(&self) -> MultiAddr {
self.outlet_addr.clone()
pub fn outlet_addr(&self) -> &Vec<MultiAddr> {
&self.outlet_addresses
}

pub fn alias(&self) -> String {
self.alias.clone()
pub fn alias(&self) -> &str {
&self.alias
}

pub fn privileged(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use itertools::Itertools;
use sqlx::*;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;

use sqlx::*;
use tracing::debug;

use crate::cli_state::storage::tcp_portals_repository::TcpPortalsRepository;
Expand Down Expand Up @@ -63,7 +63,13 @@ impl TcpPortalsRepository for TcpPortalsSqlxDatabase {
)
.bind(node_name)
.bind(tcp_inlet.bind_addr().to_string())
.bind(tcp_inlet.outlet_addr().to_string())
.bind(
tcp_inlet
.outlet_addr()
.iter()
.map(|x| x.to_string())
.join("//"),
)
.bind(tcp_inlet.alias())
.bind(tcp_inlet.privileged());
query.execute(&*self.database.pool).await.void()?;
Expand Down Expand Up @@ -158,18 +164,26 @@ struct TcpInletRow {
impl TcpInletRow {
fn bind_addr(&self) -> Result<SocketAddr> {
SocketAddr::from_str(&self.bind_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
}

fn outlet_addr(&self) -> Result<MultiAddr> {
MultiAddr::from_str(&self.outlet_addr)
.map_err(|e| ockam_core::Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))
fn outlet_addresses(&self) -> Result<Vec<MultiAddr>> {
let mut multiaddresses = Vec::new();

for addr in self.outlet_addr.split("//") {
multiaddresses.push(
MultiAddr::from_str(addr)
.map_err(|e| Error::new(Origin::Api, Kind::Serialization, format!("{e:?}")))?,
);
}

Ok(multiaddresses)
}

fn tcp_inlet(&self) -> Result<TcpInlet> {
Ok(TcpInlet::new(
&self.bind_addr()?,
&self.outlet_addr()?,
&self.outlet_addresses()?,
&self.alias,
self.privileged.to_bool(),
))
Expand Down Expand Up @@ -212,7 +226,7 @@ mod tests {

let tcp_inlet = TcpInlet::new(
&SocketAddr::from_str("127.0.0.1:80").unwrap(),
&MultiAddr::from_str("/node/outlet").unwrap(),
&["/node/outlet1".parse()?, "/node/outlet2".parse()?],
"alias",
true,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ impl CliState {
&self,
node_name: &str,
bind_addr: &SocketAddr,
outlet_addr: &MultiAddr,
outlet_addresses: &[MultiAddr],
alias: &str,
privileged: bool,
) -> Result<TcpInlet> {
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addr, alias, privileged);
let tcp_inlet = TcpInlet::new(bind_addr, outlet_addresses, alias, privileged);
self.tcp_portals_repository()
.store_tcp_inlet(node_name, &tcp_inlet)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use ockam_core::Route;
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use std::sync::Arc;
use std::time::Duration;

impl HttpControlNodeApiBackend {
pub(super) async fn handle_tcp_inlet(
Expand All @@ -35,7 +36,7 @@ impl HttpControlNodeApiBackend {
},
Method::DELETE => match resource_id {
None => ControlApiHttpResponse::missing_resource_id(ResourceKind::TcpInlets),
Some(id) => handle_tcp_inlet_delete(&self.node_manager, id).await,
Some(id) => handle_tcp_inlet_delete(context, &self.node_manager, id).await,
},
_ => {
warn!("Invalid method: {method}");
Expand Down Expand Up @@ -153,16 +154,30 @@ async fn handle_tcp_inlet_create(
)?),
};

if request.to.is_empty() {
return ControlApiHttpResponse::bad_request("`to` must not be empty");
}

let to = {
let mut to = Vec::new();
for address in request.to.iter() {
to.push(address.parse()?);
}
to
};

let result = node_manager
.create_inlet(
context,
request.from.try_into()?,
Route::default(),
Route::default(),
request.to.parse()?,
request.target_redundancy.unwrap_or(to.len() - 1),
to,
request.name.unwrap_or_else(random_string),
allow,
None,
Some(Duration::from_millis(request.ping_timeout)),
authorized,
false,
None,
Expand Down Expand Up @@ -286,10 +301,11 @@ async fn handle_tcp_inlet_list(
)
)]
async fn handle_tcp_inlet_delete(
context: &Context,
node_manager: &Arc<NodeManager>,
resource_id: &str,
) -> Result<ControlApiHttpResponse, ControlApiError> {
let result = node_manager.delete_inlet(resource_id).await;
let result = node_manager.delete_inlet(context, resource_id).await;
match result {
Ok(_) => Ok(ControlApiHttpResponse::without_body(
StatusCode::NO_CONTENT,
Expand Down Expand Up @@ -362,11 +378,13 @@ mod test {
hostname: "127.0.0.1".to_string(),
port: 0,
},
to: "/service/outlet".to_string(),
to: vec!["/service/outlet".to_string()],
target_redundancy: None,
identity: None,
authorized: None,
allow: None,
retry_wait: 1000,
retry_wait: 1_000,
ping_timeout: 1_000,
})
.unwrap(),
),
Expand All @@ -384,8 +402,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Down);
assert_eq!(inlet_status.current_route, None);
assert_eq!(inlet_status.to, "/service/outlet");
assert!(inlet_status.active_routes.is_empty());
assert_eq!(inlet_status.to, vec!["/service/outlet"]);
assert_eq!(inlet_status.bind_address.hostname, "127.0.0.1");
assert!(inlet_status.bind_address.port > 0);

Expand All @@ -408,8 +426,8 @@ mod test {
let inlet_status: InletStatus = serde_json::from_slice(response.body.as_slice()).unwrap();
assert_eq!(inlet_status.name, "inlet-name");
assert_eq!(inlet_status.status, ConnectionStatus::Up);
assert_eq!(inlet_status.current_route, Some("0#outlet".to_string()));
assert_eq!(inlet_status.to, "/service/outlet");
assert_eq!(inlet_status.active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlet_status.to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "GET".to_string(),
Expand All @@ -429,8 +447,8 @@ mod test {
assert_eq!(inlets.len(), 1);
assert_eq!(inlets[0].name, "inlet-name");
assert_eq!(inlets[0].status, ConnectionStatus::Up);
assert_eq!(inlets[0].current_route, Some("0#outlet".to_string()));
assert_eq!(inlets[0].to, "/service/outlet");
assert_eq!(inlets[0].active_routes, vec!["0#outlet".to_string()]);
assert_eq!(inlets[0].to, vec!["/service/outlet"]);

let request = ControlApiHttpRequest {
method: "DELETE".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fn retry_wait_default() -> u64 {
20000
}

fn ping_timeout_default() -> u64 {
10_000
}

#[derive(Debug, Serialize, Deserialize, Default, ToSchema)]
#[serde(rename_all = "kebab-case")]
pub enum InletKind {
Expand Down Expand Up @@ -65,9 +69,13 @@ pub struct CreateInletRequest {
#[serde(default = "tcp_inlet_default_bind_address")]
#[schema(default = tcp_inlet_default_bind_address)]
pub from: HostnamePort,
/// Multiaddress to a TCP Outlet
/// Multiaddresses to a TCP Outlet
#[schema(example = "/project/default/service/forward_to_node1/secure/api/service/outlet")]
pub to: String,
pub to: Vec<String>,
/// Target redundancy for the TCP Inlet routes; 0 means only one route is instantiated
/// When omitted, the number of provided Multiaddresses minus one applies
#[serde(default)]
pub target_redundancy: Option<usize>,
/// Identity to be used to create the secure channel;
/// When omitted, the node's identity will be used
pub identity: Option<String>,
Expand All @@ -84,6 +92,11 @@ pub struct CreateInletRequest {
#[serde(default = "retry_wait_default")]
#[schema(default = retry_wait_default)]
pub retry_wait: u64,
/// How long until the outlet route is considered disconnected;
/// In milliseconds
#[serde(default = "ping_timeout_default")]
#[schema(default = ping_timeout_default)]
pub ping_timeout: u64,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "kebab-case")]
Expand All @@ -103,24 +116,33 @@ pub struct InletStatus {
pub status: ConnectionStatus,
/// Bind address of the TCP Inlet
pub bind_address: HostnamePort,
/// The current route of the TCP Inlet, populated only when the status is `up`
pub current_route: Option<String>,
/// Multiaddress to the TCP Outlet
pub to: String,
/// The active route of the TCP Inlet, empty when the connection is down
pub active_routes: Vec<String>,
/// The number of target redundant routes, 0 means only one route is instantiated
pub target_redundancy: usize,
/// Multiaddresses to the TCP Outlet
pub to: Vec<String>,
}

impl TryFrom<crate::nodes::models::portal::InletStatus> for InletStatus {
impl TryFrom<crate::nodes::models::portal::InletStatusView> for InletStatus {
type Error = ockam_core::Error;

fn try_from(status: crate::nodes::models::portal::InletStatus) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_addr.as_str())?;
fn try_from(
status: crate::nodes::models::portal::InletStatusView,
) -> Result<Self, Self::Error> {
let bind_address = HostnamePort::try_from(status.bind_address.as_str())?;

Ok(InletStatus {
status: status.status.into(),
status: status.connection.into(),
bind_address,
name: status.alias,
current_route: status.outlet_route.map(|r| r.to_string()),
to: status.outlet_addr,
active_routes: status
.outlet_routes
.into_iter()
.map(|r| r.to_string())
.collect(),
target_redundancy: status.target_redundancy,
to: status.outlet_addresses,
})
}
}
Loading
Loading