From 3a42a87563cebe2e3f0430e607d8ec9704bdf2cb Mon Sep 17 00:00:00 2001 From: Davide Baldo Date: Wed, 20 Mar 2024 15:09:34 +0100 Subject: [PATCH 1/2] feat(rust): add one second cache for incoming and outgoing access control --- Cargo.lock | 1 + Cargo.toml | 1 + .../rust/ockam/ockam_api/Cargo.toml | 1 + .../rust/ockam/ockam_api/src/nodes/service.rs | 11 +- .../ockam/ockam_core/src/access_control.rs | 4 + .../ockam_core/src/access_control/cache.rs | 349 ++++++++++++++++++ 6 files changed, 366 insertions(+), 1 deletion(-) create mode 100644 implementations/rust/ockam/ockam_core/src/access_control/cache.rs diff --git a/Cargo.lock b/Cargo.lock index ff1541ba77a..bc1d5c7388d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4225,6 +4225,7 @@ dependencies = [ "base64-url", "bytes 1.5.0", "cddl-cat", + "cfg-if", "chrono", "colorful", "either", diff --git a/Cargo.toml b/Cargo.toml index 85edce3e7af..23a17e0e325 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ inherits = "test" debug = 1 strip = "none" inherits = "release" +force-frame-pointers = "yes" # compromise: minimal optimization on selected dependencies # to reduce cli bootstrap time by ~5x diff --git a/implementations/rust/ockam/ockam_api/Cargo.toml b/implementations/rust/ockam/ockam_api/Cargo.toml index a437cb58e05..6a5e15d1ef7 100644 --- a/implementations/rust/ockam/ockam_api/Cargo.toml +++ b/implementations/rust/ockam/ockam_api/Cargo.toml @@ -32,6 +32,7 @@ storage = ["ockam/storage"] aws-config = { version = "1.1.8", default-features = false, features = ["rustls"] } base64-url = "2.0.2" bytes = { version = "1.5.0", default-features = false, features = ["serde"] } +cfg-if = "1.0.0" chrono = { version = "0.4" } colorful = "0.2" either = { version = "1.10.0", default-features = false } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 6c5f0389982..f4e1d2031d9 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -249,7 +249,16 @@ impl NodeManager { authority, ) .await?; - Ok(Arc::new(policy_access_control)) + + cfg_if::cfg_if! { + if #[cfg(feature = "std")] { + let cached_policy_access_control = ockam_core::access_control::CachedIncomingAccessControl::new( + Box::new(policy_access_control)); + Ok(Arc::new(cached_policy_access_control)) + } else { + Ok(Arc::new(policy_access_control)) + } + } } else { warn! { resource_name = resource_name_str, diff --git a/implementations/rust/ockam/ockam_core/src/access_control.rs b/implementations/rust/ockam/ockam_core/src/access_control.rs index 9201da07017..2b3bde9371e 100644 --- a/implementations/rust/ockam/ockam_core/src/access_control.rs +++ b/implementations/rust/ockam/ockam_core/src/access_control.rs @@ -67,6 +67,8 @@ pub trait OutgoingAccessControl: Debug + Send + Sync + 'static { mod all; mod allow_all; mod any; +#[cfg(feature = "std")] +mod cache; mod deny_all; mod onward; mod source; @@ -74,6 +76,8 @@ mod source; pub use all::*; pub use allow_all::*; pub use any::*; +#[cfg(feature = "std")] +pub use cache::*; pub use deny_all::*; pub use onward::*; pub use source::*; diff --git a/implementations/rust/ockam/ockam_core/src/access_control/cache.rs b/implementations/rust/ockam/ockam_core/src/access_control/cache.rs new file mode 100644 index 00000000000..11ba5884aaa --- /dev/null +++ b/implementations/rust/ockam/ockam_core/src/access_control/cache.rs @@ -0,0 +1,349 @@ +use crate::compat::sync::Mutex; +use crate::{ + Address, IncomingAccessControl, LocalInfo, OutgoingAccessControl, RelayMessage, Route, +}; +use alloc::vec::Vec; +use async_trait::async_trait; +use core::fmt::Debug; +use std::time::Instant; + +const CACHE_MAX_SIZE: usize = 10; +const CACHE_DURATION_SECS: u64 = 1; + +#[derive(Debug)] +struct CacheEntry { + source: Address, + destination: Address, + onward_route: Route, + return_route: Route, + local_info: Vec, + timestamp: Instant, +} + +impl CacheEntry { + fn from(relay_message: &RelayMessage) -> Self { + Self { + source: relay_message.source().clone(), + destination: relay_message.destination().clone(), + onward_route: relay_message.onward_route().clone(), + return_route: relay_message.return_route().clone(), + local_info: relay_message.local_message().local_info(), + timestamp: Instant::now(), + } + } + + /// Returns true if the cache entry is expired. + fn is_expired(&self) -> bool { + self.timestamp.elapsed().as_secs() >= CACHE_DURATION_SECS + } + + /// Returns true if the relay message matches the cache entry. + /// Everything except the payload is compared. + fn matches(&self, relay_message: &RelayMessage) -> bool { + self.source == *relay_message.source() + && self.destination == *relay_message.destination() + && self.onward_route == *relay_message.onward_route() + && self.return_route == *relay_message.return_route() + && self.local_info == relay_message.local_message().local_info_ref() + } +} + +#[derive(Debug)] +struct Cache { + cache: Mutex>, +} + +impl Cache { + pub fn new() -> Self { + Self { + cache: Mutex::new(Vec::new()), + } + } + + /// Returns true if the relay message is in the cache and not expired. + pub fn exist_in_cache(&self, relay_message: &RelayMessage) -> bool { + let mut cache_guard = self.cache.lock().unwrap(); + cache_guard + .iter() + .position(|entry| entry.matches(relay_message)) + .map(|position| { + if cache_guard[position].is_expired() { + cache_guard.remove(position); + false + } else { + true + } + }) + .unwrap_or(false) + } + + /// Adds the relay message to the cache. + pub fn add_authorized(&self, relay_message: &RelayMessage) { + let mut cache_guard = self.cache.lock().unwrap(); + let position = cache_guard + .iter() + .position(|entry| entry.matches(relay_message)); + if let Some(position) = position { + cache_guard.remove(position); + } + cache_guard.push(CacheEntry::from(relay_message)); + if cache_guard.len() > CACHE_MAX_SIZE { + cache_guard.remove(0); + } + } +} + +/// A wrapper for an incoming access control that caches successful authorizations. +/// The message is considered the same if everything except the payload is the same. +/// Keeps a cache of the last [`CACHE_MAX_SIZE`] authorized messages with validity of +/// [`CACHE_DURATION_SECS`] seconds. +#[derive(Debug)] +pub struct CachedIncomingAccessControl { + cache: Cache, + access_control: Box, +} + +impl CachedIncomingAccessControl { + /// Wraps an incoming access control with a cache. + pub fn new(access_control: Box) -> Self { + Self { + cache: Cache::new(), + access_control, + } + } +} + +#[async_trait] +impl IncomingAccessControl for CachedIncomingAccessControl { + async fn is_authorized(&self, relay_msg: &RelayMessage) -> crate::Result { + if self.cache.exist_in_cache(relay_msg) { + return crate::allow(); + } + let is_authorized = self.access_control.is_authorized(relay_msg).await?; + if is_authorized { + self.cache.add_authorized(relay_msg); + crate::allow() + } else { + crate::deny() + } + } +} + +/// A wrapper for an outgoing access control that caches successful authorizations. +/// The message is considered the same if everything except the payload is the same. +/// Keeps a cache of the last [`CACHE_MAX_SIZE`] authorized messages with validity of +/// [`CACHE_DURATION_SECS`] seconds. +#[derive(Debug)] +pub struct CachedOutgoingAccessControl { + cache: Cache, + access_control: Box, +} + +impl CachedOutgoingAccessControl { + /// Wraps an outgoing access control with a cache. + pub fn new(access_control: Box) -> Self { + Self { + cache: Cache::new(), + access_control, + } + } +} + +#[async_trait] +impl OutgoingAccessControl for CachedOutgoingAccessControl { + async fn is_authorized(&self, relay_msg: &RelayMessage) -> crate::Result { + if self.cache.exist_in_cache(relay_msg) { + return crate::allow(); + } + let is_authorized = self.access_control.is_authorized(relay_msg).await?; + if is_authorized { + self.cache.add_authorized(relay_msg); + crate::allow() + } else { + crate::deny() + } + } +} + +#[cfg(test)] +pub mod test { + use crate::access_control::cache::{CacheEntry, CACHE_DURATION_SECS}; + use crate::{ + route, Address, IncomingAccessControl, LocalInfo, OutgoingAccessControl, RelayMessage, + }; + use async_trait::async_trait; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::time::Duration; + use std::time::Instant; + use tokio::time::sleep; + + #[derive(Debug)] + struct DebugAccessControl { + authorized: Arc, + } + + #[async_trait] + impl IncomingAccessControl for DebugAccessControl { + async fn is_authorized(&self, _relay_msg: &RelayMessage) -> crate::Result { + Ok(self.authorized.load(Ordering::Relaxed)) + } + } + + #[async_trait] + impl OutgoingAccessControl for DebugAccessControl { + async fn is_authorized(&self, _relay_msg: &RelayMessage) -> crate::Result { + Ok(self.authorized.load(Ordering::Relaxed)) + } + } + fn relay_message() -> RelayMessage { + RelayMessage::new( + Address::random_local(), + Address::random_local(), + crate::LocalMessage::new() + .with_onward_route(route!["onward"]) + .with_return_route(route!["return"]) + .with_local_info(vec![LocalInfo::new("type".into(), vec![1, 2, 3])]), + ) + } + + // deduplicated test for incoming and outgoing access control + macro_rules! access_policy_test { + ($struct_name:tt) => { + let authorized = Arc::new(AtomicBool::new(false)); + let access_control = DebugAccessControl { + authorized: authorized.clone(), + }; + + let access_control = crate::$struct_name::new(Box::new(access_control)); + let relay_msg = relay_message(); + + // negative result is not cached + assert!(!access_control.is_authorized(&relay_msg).await.unwrap()); + authorized.store(true, Ordering::Relaxed); + assert!(access_control.is_authorized(&relay_msg).await.unwrap()); + + // positive result is cached + authorized.store(false, Ordering::Relaxed); + assert!(access_control.is_authorized(&relay_msg).await.unwrap()); + + // but it expires + sleep(Duration::from_millis(CACHE_DURATION_SECS * 1000 + 100)).await; + assert!(!access_control.is_authorized(&relay_msg).await.unwrap()); + + // positive result is cached again until the cache is full + authorized.store(true, Ordering::Relaxed); + assert!(access_control.is_authorized(&relay_msg).await.unwrap()); + for _ in 0..crate::access_control::cache::CACHE_MAX_SIZE { + let different_relay_msg = relay_message(); + assert!(access_control + .is_authorized(&different_relay_msg) + .await + .unwrap()); + } + // the relay message is no longer cached + authorized.store(false, Ordering::Relaxed); + assert!(!access_control.is_authorized(&relay_msg).await.unwrap()); + }; + } + + #[tokio::test] + pub async fn incoming_access_control() { + access_policy_test!(CachedIncomingAccessControl); + } + + #[tokio::test] + pub async fn outgoing_access_control() { + access_policy_test!(CachedOutgoingAccessControl); + } + + #[test] + pub fn cache_entry_matches() { + let relay_msg = relay_message(); + + // self matches + let entry = crate::access_control::cache::CacheEntry::from(&relay_msg); + assert!(entry.matches(&relay_msg)); + + // payload is ignored + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg.local_message().clone().with_payload(vec![1]), + ); + assert!(entry.matches(&cloned)); + + // we check that if any field is different, the entry does not match + + // source + let cloned = RelayMessage::new( + Address::random_local(), + relay_msg.destination().clone(), + relay_msg.local_message().clone(), + ); + assert!(!entry.matches(&cloned)); + + // destination + let cloned = RelayMessage::new( + relay_msg.source().clone(), + Address::random_local(), + relay_msg.local_message().clone(), + ); + assert!(!entry.matches(&cloned)); + + // onward route + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_onward_route(route!["different"]), + ); + assert!(!entry.matches(&cloned)); + + // return route + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_return_route(route!["different"]), + ); + assert!(!entry.matches(&cloned)); + + // local info + let cloned = RelayMessage::new( + relay_msg.source().clone(), + relay_msg.destination().clone(), + relay_msg + .local_message() + .clone() + .with_local_info(vec![LocalInfo::new("type".into(), vec![1, 2, 3, 4])]), + ); + assert!(!entry.matches(&cloned)); + } + + #[test] + pub fn cache_entry_is_expired() { + let entry = CacheEntry { + source: Address::random_local(), + destination: Address::random_local(), + onward_route: route!["onward"], + return_route: route!["return"], + local_info: vec![], + timestamp: Instant::now(), + }; + + // not expired + assert!(!entry.is_expired()); + + // expired + let entry = CacheEntry { + timestamp: Instant::now() - Duration::from_secs(CACHE_DURATION_SECS), + ..entry + }; + assert!(entry.is_expired()); + } +} From 12307cb1f02de4fa53112e87cd9921304375d9ce Mon Sep 17 00:00:00 2001 From: Davide Baldo Date: Wed, 20 Mar 2024 19:34:14 +0100 Subject: [PATCH 2/2] feat: added relay profiling and improved result readability --- Cargo.toml | 1 - tools/profile/README.md | 1 + tools/profile/portal.perf | 7 +++-- tools/profile/portal.valgrind.dhat | 7 +++-- tools/profile/portal_two_nodes.perf | 7 +++-- tools/profile/relay_portal.perf | 46 +++++++++++++++++++++++++++++ 6 files changed, 62 insertions(+), 7 deletions(-) create mode 100755 tools/profile/relay_portal.perf diff --git a/Cargo.toml b/Cargo.toml index 23a17e0e325..85edce3e7af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ inherits = "test" debug = 1 strip = "none" inherits = "release" -force-frame-pointers = "yes" # compromise: minimal optimization on selected dependencies # to reduce cli bootstrap time by ~5x diff --git a/tools/profile/README.md b/tools/profile/README.md index 220da696169..7118015cc65 100644 --- a/tools/profile/README.md +++ b/tools/profile/README.md @@ -5,6 +5,7 @@ This directory contains tools for profiling ockam. Two scenarios for performance profiling: - `portal.perf` - local portal, within one node - `portal_two_nodes.perf` - two nodes, one inlet and outlet +- `relay_port.perf` - one node, one inlet and outlet passing through a relay And one scenario for heap profiling: - `portal.valgrind.dhat` - local portal, within one node diff --git a/tools/profile/portal.perf b/tools/profile/portal.perf index b271d6f3aa7..13fb0a275ec 100755 --- a/tools/profile/portal.perf +++ b/tools/profile/portal.perf @@ -12,9 +12,12 @@ fi set -e -cargo build --profile profiling -p ockam_command +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command + OCKAM=target/profiling/ockam +fi -OCKAM=target/profiling/ockam +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info perf record --call-graph dwarf -F 99 --output /tmp/ockam.perf -- "${OCKAM}" node create portal -f & perf_pid=$! diff --git a/tools/profile/portal.valgrind.dhat b/tools/profile/portal.valgrind.dhat index be003c3c670..f7f556bf2a7 100755 --- a/tools/profile/portal.valgrind.dhat +++ b/tools/profile/portal.valgrind.dhat @@ -12,9 +12,12 @@ fi set -e -cargo build --profile profiling -p ockam_command +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command + OCKAM=target/profiling/ockam +fi -OCKAM=target/profiling/ockam +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info valgrind --tool=dhat --trace-children=yes --dhat-out-file=/tmp/ockam.valgrind.dhat -- "${OCKAM}" node create portal diff --git a/tools/profile/portal_two_nodes.perf b/tools/profile/portal_two_nodes.perf index 7e97b5d77b3..f4e44207fdc 100755 --- a/tools/profile/portal_two_nodes.perf +++ b/tools/profile/portal_two_nodes.perf @@ -12,9 +12,12 @@ fi set -e -cargo build --profile profiling -p ockam_command +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command + OCKAM=target/profiling/ockam +fi -OCKAM=target/profiling/ockam +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true export OCKAM_LOG_LEVEL=info perf record --call-graph dwarf -F 99 --output /tmp/ockam.inlet.perf -- "${OCKAM}" node create inlet -f & perf record --call-graph dwarf -F 99 --output /tmp/ockam.outlet.perf -- "${OCKAM}" node create outlet -f & diff --git a/tools/profile/relay_portal.perf b/tools/profile/relay_portal.perf new file mode 100755 index 00000000000..db150e899b6 --- /dev/null +++ b/tools/profile/relay_portal.perf @@ -0,0 +1,46 @@ +#!/bin/bash + +if ! [ -x "$(command -v iperf3)" ]; then + echo 'Error: iperf3 is not installed.' >&2 + exit 1 +fi + +if ! [ -x "$(command -v perf)" ]; then + echo 'Error: perf is not installed. perf is linux-specific, see dtrace for macos.' >&2 + exit 1 +fi + +set -e + +if [ -z "${OCKAM}" ]; then + RUSTFLAGS="-C force-frame-pointers=yes" cargo build --profile profiling -p ockam_command + OCKAM=target/profiling/ockam +fi + +"${OCKAM}" node delete portal -y >/dev/null 2>&1 || true +export OCKAM_LOG_LEVEL=info +perf record --call-graph dwarf -F 99 --output /tmp/ockam.perf -- "${OCKAM}" node create portal -f & +perf_pid=$! + +sleep 1 +"${OCKAM}" tcp-outlet create --to 5000 --at portal +"${OCKAM}" relay create --to portal +"${OCKAM}" tcp-inlet create --from 8000 --to /project/default/service/forward_to_default/secure/api/service/outlet --at portal + +iperf3 --server --port 5000 --one-off & +iperf3_server_pid=$! + +sleep 0.3 # wait for server to start +iperf3 --zerocopy --client 127.0.0.1 --port 8000 --time 60 + +kill ${iperf3_server_pid} +"${OCKAM}" node delete portal -y + +echo "Waiting for perf to finish writing /tmp/ockam.perf..." +wait ${perf_pid} + +echo "Converting perf file to firefox profiler format, could take up to few minutes..." +perf script -F +pid --input /tmp/ockam.perf > /tmp/ockam.perf.firefox + +echo "You can use firefox web profiler to open /tmp/ockam.perf.firefox file." +echo "https://profiler.firefox.com/"