diff --git a/Cargo.lock b/Cargo.lock index 914221a1b15..81614036ccc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,6 +4225,7 @@ dependencies = [ "base64-url", "bytes 1.5.0", "cddl-cat", + "cfg-if", "chrono", "colorful", "either", diff --git a/Cargo.toml b/Cargo.toml index 85edce3e7af..23a17e0e325 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ inherits = "test" debug = 1 strip = "none" inherits = "release" +force-frame-pointers = "yes" # compromise: minimal optimization on selected dependencies # to reduce cli bootstrap time by ~5x diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index a437cb58e05..6a5e15d1ef7 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -32,6 +32,7 @@ storage = ["ockam/storage"] aws-config = { version = "1.1.8", default-features = false, features = ["rustls"] } base64-url = "2.0.2" bytes = { version = "1.5.0", default-features = false, features = ["serde"] } +cfg-if = "1.0.0" chrono = { version = "0.4" } colorful = "0.2" either = { version = "1.10.0", default-features = false } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 7134edb42d6..b1d915dd5f7 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -249,7 +249,16 @@ impl NodeManager { authority, ) .await?; - Ok(Arc::new(policy_access_control)) + + cfg_if::cfg_if! { + if #[cfg(feature = "std")] { + let cached_policy_access_control = ockam_core::access_control::CachedIncomingAccessControl::new( + Box::new(policy_access_control)); + Ok(Arc::new(cached_policy_access_control)) + } else { + Ok(Arc::new(policy_access_control)) + } + } } else { warn! { resource_name = resource_name_str, diff --git a/implementations/rust/ockam/ockam_core/src/access_control.rs b/implementations/rust/ockam/ockam_core/src/access_control.rs index 9201da07017..2b3bde9371e 100644 --- a/implementations/rust/ockam/ockam_core/src/access_control.rs +++ b/implementations/rust/ockam/ockam_core/src/access_control.rs @@ -67,6 +67,8 @@ pub trait OutgoingAccessControl: Debug + Send + Sync + 'static { mod all; mod allow_all; mod any; +#[cfg(feature = "std")] +mod cache; mod deny_all; mod onward; mod source; @@ -74,6 +76,8 @@ mod source; pub use all::*; pub use allow_all::*; pub use any::*; +#[cfg(feature = "std")] +pub use cache::*; pub use deny_all::*; pub use onward::*; pub use source::*; diff --git a/implementations/rust/ockam/ockam_core/src/access_control/cache.rs b/implementations/rust/ockam/ockam_core/src/access_control/cache.rs new file mode 100644 index 00000000000..baadff1d825 --- /dev/null +++ b/implementations/rust/ockam/ockam_core/src/access_control/cache.rs @@ -0,0 +1,419 @@ +use crate::compat::sync::Mutex; +use crate::{ + Address, IncomingAccessControl, LocalInfo, OutgoingAccessControl, RelayMessage, Route, +}; +use alloc::vec::Vec; +use async_trait::async_trait; +use core::fmt::Debug; +use std::time::Instant; + +const CACHE_MAX_SIZE: usize = 10; +const CACHE_DURATION_SECS: u64 = 1; + +#[derive(Debug)] +struct CacheEntry { + source: Address, + destination: Address, + onward_route: Route, + return_route: Route, + local_info: Vec, + timestamp: Instant, +} + +impl CacheEntry { + fn from(relay_message: &RelayMessage) -> Self { + Self { + source: relay_message.source().clone(), + destination: relay_message.destination().clone(), + onward_route: relay_message.onward_route().clone(), + return_route: relay_message.return_route().clone(), + local_info: relay_message.local_message().local_info(), + timestamp: Instant::now(), + } + } + + /// Returns true if the cache entry is expired. + fn is_expired(&self) -> bool { + self.timestamp.elapsed().as_secs() >= CACHE_DURATION_SECS + } + + /// Returns true if the relay message matches the cache entry. + /// Everything except the payload is compared. + fn matches(&self, relay_message: &RelayMessage) -> bool { + self.source == *relay_message.source() + && self.destination == *relay_message.destination() + && self.onward_route == *relay_message.onward_route() + && self.return_route == *relay_message.return_route() + && self.local_info == relay_message.local_message().local_info_ref() + } +} + +#[derive(Debug)] +struct Cache { + cache: Mutex>, +} + +impl Cache { + pub fn new() -> Self { + Self { + cache: Mutex::new(Vec::new()), + } + } + + /// Returns true if the relay message is in the cache and not expired. + pub fn exist_in_cache(&self, relay_message: &RelayMessage) -> bool { + let mut cache_guard = self.cache.lock().unwrap(); + let position = cache_guard + .iter() + .position(|entry| entry.matches(relay_message)); + if let Some(position) = position { + let entry = cache_guard.get_mut(position).unwrap(); + if entry.is_expired() { + cache_guard.remove(position); + false + } else { + true + } + } else { + false + } + } + + /// Adds the relay message to the cache. + pub fn add_authorized(&self, relay_message: &RelayMessage) { + let mut cache_guard = self.cache.lock().unwrap(); + let position = cache_guard + .iter() + .position(|entry| entry.matches(relay_message)); + if let Some(position) = position { + cache_guard.remove(position); + } + cache_guard.push(CacheEntry::from(relay_message)); + if cache_guard.len() > CACHE_MAX_SIZE { + cache_guard.remove(0); + } + } +} + +/// A wrapper for an incoming access control that caches successful authorizations. +/// The message is considered the same if everything except the payload is the same. +/// Keeps a cache of the last [`CACHE_MAX_SIZE`] authorized messages with validity of +/// [`CACHE_DURATION_SECS`] seconds. +#[derive(Debug)] +pub struct CachedIncomingAccessControl { + cache: Cache, + access_control: Box, +} + +impl CachedIncomingAccessControl { + /// Wraps an incoming access control with a cache. + pub fn new(access_control: Box) -> Self { + Self { + cache: Cache::new(), + access_control, + } + } +} + +#[async_trait] +impl IncomingAccessControl for CachedIncomingAccessControl { + async fn is_authorized(&self, relay_msg: &RelayMessage) -> crate::Result { + if self.cache.exist_in_cache(relay_msg) { + return crate::allow(); + } + let is_authorized = self.access_control.is_authorized(relay_msg).await?; + if is_authorized { + self.cache.add_authorized(relay_msg); + crate::allow() + } else { + crate::deny() + } + } +} + +/// A wrapper for an outgoing access control that caches successful authorizations. +/// The message is considered the same if everything except the payload is the same. +/// Keeps a cache of the last [`CACHE_MAX_SIZE`] authorized messages with validity of +/// [`CACHE_DURATION_SECS`] seconds. +#[derive(Debug)] +pub struct CachedOutgoingAccessControl { + cache: Cache, + access_control: Box, +} + +impl CachedOutgoingAccessControl { + /// Wraps an outgoing access control with a cache. + pub fn new(access_control: Box) -> Self { + Self { + cache: Cache::new(), + access_control, + } + } +} + +#[async_trait] +impl OutgoingAccessControl for CachedOutgoingAccessControl { + async fn is_authorized(&self, relay_msg: &RelayMessage) -> crate::Result { + if self.cache.exist_in_cache(relay_msg) { + return crate::allow(); + } + let is_authorized = self.access_control.is_authorized(relay_msg).await?; + if is_authorized { + self.cache.add_authorized(relay_msg); + crate::allow() + } else { + crate::deny() + } + } +} + +#[cfg(test)] +pub mod test { + use crate::access_control::cache::{CacheEntry, CACHE_DURATION_SECS}; + use crate::{ + route, Address, IncomingAccessControl, LocalInfo, OutgoingAccessControl, RelayMessage, + }; + use async_trait::async_trait; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::time::Duration; + use std::time::Instant; + use tokio::time::sleep; + + #[derive(Debug)] + struct DebugAccessControl { + authorized: Arc, + } + + #[async_trait] + impl IncomingAccessControl for DebugAccessControl { + async fn is_authorized(&self, _relay_msg: &RelayMessage) -> crate::Result { + Ok(self.authorized.load(Ordering::Relaxed)) + } + } + + #[async_trait] + impl OutgoingAccessControl for DebugAccessControl { + async fn is_authorized(&self, _relay_msg: &RelayMessage) -> crate::Result { + Ok(self.authorized.load(Ordering::Relaxed)) + } + } + fn relay_message() -> RelayMessage { + RelayMessage::new( + Address::random_local(), + Address::random_local(), + crate::LocalMessage::new() + .with_onward_route(route!["onward"]) + .with_return_route(route!["return"]) + .with_local_info(vec![LocalInfo::new("type".into(), vec![1, 2, 3])]), + ) + } + + #[tokio::test] + pub async fn incoming_access_control() { + let authorized = Arc::new(AtomicBool::new(false)); + let access_control = DebugAccessControl { + authorized: authorized.clone(), + }; + + let access_control = crate::CachedIncomingAccessControl::new(Box::new(access_control)); + let relay_msg = relay_message(); + + // negative result is not cached + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + authorized.store(true, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + + // positive result is cached + authorized.store(false, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + + // but it expires + sleep(Duration::from_millis(CACHE_DURATION_SECS * 1000 + 100)).await; + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + + // positive result is cached again until the cache is full + authorized.store(true, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + for _ in 0..crate::access_control::cache::CACHE_MAX_SIZE { + let different_relay_msg = relay_message(); + assert_eq!( + access_control + .is_authorized(&different_relay_msg) + .await + .unwrap(), + true + ); + } + // the relay message is no longer cached + authorized.store(false, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + } + + #[tokio::test] + pub async fn outgoing_access_control() { + let authorized = Arc::new(AtomicBool::new(false)); + let access_control = DebugAccessControl { + authorized: authorized.clone(), + }; + + let access_control = crate::CachedOutgoingAccessControl::new(Box::new(access_control)); + let relay_msg = relay_message(); + + // negative result is not cached + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + authorized.store(true, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + + // positive result is cached + authorized.store(false, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + + // but it expires + sleep(Duration::from_millis(CACHE_DURATION_SECS * 1000 + 100)).await; + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + + // positive result is cached again until the cache is full + authorized.store(true, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + true + ); + for _ in 0..crate::access_control::cache::CACHE_MAX_SIZE { + let different_relay_msg = relay_message(); + assert_eq!( + access_control + .is_authorized(&different_relay_msg) + .await + .unwrap(), + true + ); + } + // the relay message is no longer cached + authorized.store(false, Ordering::Relaxed); + assert_eq!( + access_control.is_authorized(&relay_msg).await.unwrap(), + false + ); + } + + #[test] + pub fn cache_entry_matches() { + let relay_msg = relay_message(); + + // self matches + let entry = crate::access_control::cache::CacheEntry::from(&relay_msg); + assert!(entry.matches(&relay_msg)); + + // payload is ignored + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg.local_message().clone().with_payload(vec![1]), + ); + assert!(entry.matches(&cloned)); + + // we check that if any field is different, the entry does not match + + // source + let cloned = RelayMessage::new( + Address::random_local(), + relay_msg.destination().clone(), + relay_msg.local_message().clone(), + ); + assert!(!entry.matches(&cloned)); + + // destination + let cloned = RelayMessage::new( + relay_msg.source().clone(), + Address::random_local(), + relay_msg.local_message().clone(), + ); + assert!(!entry.matches(&cloned)); + + // onward route + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_onward_route(route!["different"]), + ); + assert!(!entry.matches(&cloned)); + + // return route + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_return_route(route!["different"]), + ); + assert!(!entry.matches(&cloned)); + + // local info + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_local_info(vec![LocalInfo::new("type".into(), vec![1, 2, 3, 4])]), + ); + assert!(!entry.matches(&cloned)); + } + + #[test] + pub fn cache_entry_is_expired() { + let entry = CacheEntry { + source: Address::random_local(), + destination: Address::random_local(), + onward_route: route!["onward"], + return_route: route!["return"], + local_info: vec![], + timestamp: Instant::now(), + }; + + // not expired + assert!(!entry.is_expired()); + + // expired + let entry = CacheEntry { + timestamp: Instant::now() - Duration::from_secs(CACHE_DURATION_SECS), + ..entry + }; + assert!(entry.is_expired()); + } +}