diff --git a/Cargo.lock b/Cargo.lock index 7bd499a2..39a4ac1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1868,10 +1868,10 @@ dependencies = [ "handlebars", "http 1.1.0", "http-serde-ext", + "integrationos-cache", "integrationos-domain", "js-sandbox-ios", "jsonpath_lib", - "moka", "mongodb", "reqwest", "serde", diff --git a/integrationos-api/src/config.rs b/integrationos-api/src/config.rs index 3f2ecbf7..fe511ab7 100644 --- a/integrationos-api/src/config.rs +++ b/integrationos-api/src/config.rs @@ -4,7 +4,7 @@ use std::{ }; use envconfig::Envconfig; -use integrationos_domain::cache::CacheConfig as RedisConfig; +use integrationos_domain::cache::CacheConfig; use integrationos_domain::{ database::DatabaseConfig, openai::OpenAiConfig, secrets::SecretsConfig, }; @@ -32,6 +32,12 @@ pub struct Config { pub connection_definition_cache_ttl_secs: u64, #[envconfig(from = "CONNECTION_OAUTH_DEFINITION_CACHE_TTL_SECS", default = "120")] pub connection_oauth_definition_cache_ttl_secs: u64, + #[envconfig(from = "CONNECTION_MODEL_SCHEMA_TTL_SECS", default = "3600")] + pub connection_model_schema_cache_ttl_secs: u64, + #[envconfig(from = "CONNECTION_MODEL_DEFINITION_CACHE_TTL_SECS", default = "3600")] + pub connection_model_definition_cache_ttl_secs: u64, + #[envconfig(from = "SECRET_CACHE_TTL_SECS", default = "3600")] + pub secret_cache_ttl_secs: u64, #[envconfig( from = "EVENT_ACCESS_PASSWORD", default = "32KFFT_i4UpkJmyPwY2TGzgHpxfXs7zS" @@ -77,7 +83,7 @@ pub struct Config { #[envconfig(nested = true)] pub openai_config: OpenAiConfig, #[envconfig(nested = true)] - pub redis_config: RedisConfig, + pub cache_config: CacheConfig, #[envconfig(from = "RATE_LIMIT_ENABLED", default = "true")] pub rate_limit_enabled: bool, } @@ -140,7 +146,7 @@ impl Display for Config { writeln!(f, "{}", self.headers)?; writeln!(f, "{}", self.db_config)?; writeln!(f, "{}", self.openai_config)?; - writeln!(f, "{}", self.redis_config)?; + writeln!(f, "{}", self.cache_config)?; writeln!(f, "RATE_LIMIT_ENABLED: {}", self.rate_limit_enabled) } } diff --git a/integrationos-api/src/endpoints/mod.rs b/integrationos-api/src/endpoints/mod.rs index 1f35c355..8ed8ebc9 100644 --- a/integrationos-api/src/endpoints/mod.rs +++ b/integrationos-api/src/endpoints/mod.rs @@ -12,7 +12,7 @@ use axum::{ }; use bson::{doc, SerializerOptions}; use http::{HeaderMap, HeaderValue, StatusCode}; -use integrationos_cache::local::connection_cache::ConnectionCache; +use integrationos_cache::local::connection_cache::ConnectionCacheArcStrHeaderKey; use integrationos_domain::{ algebra::MongoStore, event_access::EventAccess, ApplicationError, Connection, IntegrationOSError, InternalError, OAuth, Store, @@ -337,11 +337,11 @@ async fn get_connection( access: &EventAccess, connection_key: &HeaderValue, stores: &AppStores, - cache: &ConnectionCache, + cache: &ConnectionCacheArcStrHeaderKey, ) -> Result, IntegrationOSError> { let connection = cache .get_or_insert_with_filter( - &(access.ownership.id.clone(), connection_key.clone()), + (access.ownership.id.clone(), connection_key.clone()), stores.connection.clone(), doc! { "key": connection_key.to_str().map_err(|_| { diff --git a/integrationos-api/src/middleware/extractor.rs b/integrationos-api/src/middleware/extractor.rs index 1097fe72..21480172 100644 --- a/integrationos-api/src/middleware/extractor.rs +++ b/integrationos-api/src/middleware/extractor.rs @@ -34,13 +34,13 @@ impl RateLimiter { return Err(anyhow::anyhow!("Rate limiting is disabled")); }; - let mut redis = RedisCache::new(&state.config.redis_config) + let mut redis = RedisCache::new(&state.config.cache_config) .await .with_context(|| "Could not connect to redis")?; let (tx, mut rx) = channel::<(Arc, oneshot::Sender)>(1024); - let throughput_key = state.config.redis_config.api_throughput_key.clone(); + let throughput_key = state.config.cache_config.api_throughput_key.clone(); tokio::spawn(async move { while let Some((id, tx)) = rx.recv().await { diff --git a/integrationos-api/src/server.rs b/integrationos-api/src/server.rs index c5bae233..aaa83629 100644 --- a/integrationos-api/src/server.rs +++ b/integrationos-api/src/server.rs @@ -9,7 +9,8 @@ use crate::{ use anyhow::{anyhow, Context, Result}; use axum::Router; use integrationos_cache::local::{ - connection_cache::ConnectionCache, connection_definition_cache::ConnectionDefinitionCache, + connection_cache::ConnectionCacheArcStrHeaderKey, + connection_definition_cache::ConnectionDefinitionCache, connection_oauth_definition_cache::ConnectionOAuthDefinitionCache, event_access_cache::EventAccessCache, }; @@ -26,7 +27,7 @@ use integrationos_domain::{ stage::Stage, Connection, Event, Pipeline, PlatformData, Store, Transaction, }; -use integrationos_unified::unified::UnifiedDestination; +use integrationos_unified::unified::{UnifiedCacheTTLs, UnifiedDestination}; use mongodb::{options::UpdateOptions, Client, Database}; use segment::{AutoBatcher, Batcher, HttpClient}; use std::{sync::Arc, time::Duration}; @@ -64,7 +65,7 @@ pub struct AppState { pub openapi_data: OpenAPIData, pub http_client: reqwest::Client, pub event_access_cache: EventAccessCache, - pub connections_cache: ConnectionCache, + pub connections_cache: ConnectionCacheArcStrHeaderKey, pub connection_definitions_cache: ConnectionDefinitionCache, pub connection_oauth_definitions_cache: ConnectionOAuthDefinitionCache, pub secrets_client: Arc, @@ -117,6 +118,14 @@ impl Server { config.db_config.clone(), config.cache_size, secrets_client.clone(), + UnifiedCacheTTLs { + connection_cache_ttl_secs: config.connection_cache_ttl_secs, + connection_model_schema_cache_ttl_secs: config + .connection_model_schema_cache_ttl_secs, + connection_model_definition_cache_ttl_secs: config + .connection_model_definition_cache_ttl_secs, + secret_cache_ttl_secs: config.secret_cache_ttl_secs, + }, ) .await .with_context(|| "Could not initialize extractor caller")?; @@ -146,8 +155,10 @@ impl Server { let event_access_cache = EventAccessCache::new(config.cache_size, config.access_key_cache_ttl_secs); - let connections_cache = - ConnectionCache::new(config.cache_size, config.connection_cache_ttl_secs); + let connections_cache = ConnectionCacheArcStrHeaderKey::create( + config.cache_size, + config.connection_cache_ttl_secs, + ); let connection_definitions_cache = ConnectionDefinitionCache::new( config.cache_size, config.connection_definition_cache_ttl_secs, diff --git a/integrationos-api/tests/api_tests/test_server/test_core.rs b/integrationos-api/tests/api_tests/test_server/test_core.rs index d5ee52f8..0bf0377f 100644 --- a/integrationos-api/tests/api_tests/test_server/test_core.rs +++ b/integrationos-api/tests/api_tests/test_server/test_core.rs @@ -39,7 +39,7 @@ impl TestCore { let mut config = EventCoreConfig::init_from_hashmap(&HashMap::from([])).unwrap(); config.db = api_config.db_config.clone(); - config.redis = gateway_config.redis.clone(); + config.cache = gateway_config.redis.clone(); let control_store = Arc::new( MongoControlDataStore::new(&config, secrets_client) @@ -55,7 +55,7 @@ impl TestCore { control_data_store: control_store.clone(), }; - let event_handler = EventHandler::new(config.redis.clone(), control_store, context_store) + let event_handler = EventHandler::new(config.cache.clone(), control_store, context_store) .await .unwrap(); diff --git a/integrationos-cache/src/local/connection_cache.rs b/integrationos-cache/src/local/connection_cache.rs index c889b452..ef0d8016 100644 --- a/integrationos-cache/src/local/connection_cache.rs +++ b/integrationos-cache/src/local/connection_cache.rs @@ -3,14 +3,16 @@ use http::HeaderValue; use integrationos_domain::{Connection, IntegrationOSError, MongoStore, Unit}; use moka::future::Cache; use mongodb::bson::Document; +use std::fmt::Debug; +use std::hash::Hash; use std::{sync::Arc, time::Duration}; #[derive(Clone)] -pub struct ConnectionCache { - inner: Arc, HeaderValue), Connection>>, +pub struct ConnectionCacheForKey { + inner: Arc>, } -impl ConnectionCache { +impl ConnectionCacheForKey { pub fn new(size: u64, ttl: u64) -> Self { Self { inner: Arc::new( @@ -24,31 +26,40 @@ impl ConnectionCache { pub async fn get_or_insert_with_filter( &self, - key: &(Arc, HeaderValue), + key: K, store: MongoStore, filter: Document, ) -> Result { self.inner - .get_or_insert_with_filter(key, store, filter) + .get_or_insert_with_filter(&key, store, filter) .await } - pub async fn get( - &self, - key: &(Arc, HeaderValue), - ) -> Result, IntegrationOSError> { - self.inner.get(key).await + pub async fn get(&self, key: K) -> Result, IntegrationOSError> { + self.inner.get(&key).await } - pub async fn set( - &self, - key: &(Arc, HeaderValue), - value: &Connection, - ) -> Result { - self.inner.set(key, value).await + pub async fn set(&self, key: K, value: &Connection) -> Result { + self.inner.set(&key, value).await + } + + pub async fn remove(&self, key: K) -> Result { + self.inner.remove(&key).await + } +} + +pub type ConnectionCacheArcStrKey = ConnectionCacheForKey>; + +impl ConnectionCacheArcStrKey { + pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey> { + ConnectionCacheForKey::new(size, ttl) } +} + +pub type ConnectionCacheArcStrHeaderKey = ConnectionCacheForKey<(Arc, HeaderValue)>; - pub async fn remove(&self, key: &(Arc, HeaderValue)) -> Result { - self.inner.remove(key).await +impl ConnectionCacheArcStrHeaderKey { + pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey<(Arc, HeaderValue)> { + ConnectionCacheForKey::new(size, ttl) } } diff --git a/integrationos-cache/src/local/connection_model_definition_cache.rs b/integrationos-cache/src/local/connection_model_definition_cache.rs index 89e759fd..86a9f870 100644 --- a/integrationos-cache/src/local/connection_model_definition_cache.rs +++ b/integrationos-cache/src/local/connection_model_definition_cache.rs @@ -1,24 +1,27 @@ use crate::LocalCacheExt; +use futures::Future; use integrationos_domain::{ - connection_model_definition::ConnectionModelDefinition, Id, IntegrationOSError, MongoStore, - Unit, + connection_model_definition::ConnectionModelDefinition, destination::Destination, Id, + IntegrationOSError, MongoStore, Unit, }; use moka::future::Cache; use mongodb::bson::Document; -use std::{sync::Arc, time::Duration}; +use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration}; #[derive(Clone)] -pub struct ConnectionModelDefinitionCache { - inner: Arc>, +pub struct ConnectionModelDefinitionCacheForKey< + K: Clone + Send + Sync + Eq + Hash + Debug + 'static, +> { + inner: Arc>, } -impl ConnectionModelDefinitionCache { - pub fn new(size: u64) -> Self { +impl ConnectionModelDefinitionCacheForKey { + pub fn new(size: u64, ttl: u64) -> Self { Self { inner: Arc::new( Cache::builder() .max_capacity(size) - .time_to_live(Duration::from_secs(120)) + .time_to_live(Duration::from_secs(ttl)) .build(), ), } @@ -26,31 +29,70 @@ impl ConnectionModelDefinitionCache { pub async fn get_or_insert_with_filter( &self, - key: &Id, + key: K, store: MongoStore, filter: Document, ) -> Result { self.inner - .get_or_insert_with_filter(key, store, filter) + .get_or_insert_with_filter(&key, store, filter) .await } + pub async fn get_or_insert_with_fn( + &self, + key: K, + fa: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let result = self.inner.get(&key).await; + match result { + Ok(Some(value)) => Ok(value), + Ok(None) => { + let value = fa().await?; + self.inner.set(&key, &value).await?; + Ok(value) + } + Err(e) => Err(e), + } + } + pub async fn get( &self, - key: &Id, + key: K, ) -> Result, IntegrationOSError> { - self.inner.get(key).await + self.inner.get(&key).await } pub async fn set( &self, - key: &Id, + key: K, value: &ConnectionModelDefinition, ) -> Result { - self.inner.set(key, value).await + self.inner.set(&key, value).await } - pub async fn remove(&self, key: &Id) -> Result { - self.inner.remove(key).await + pub async fn remove(&self, key: K) -> Result { + self.inner.remove(&key).await + } +} + +#[derive(Clone)] +pub struct ConnectionModelDefinitionCacheIdKey; + +impl ConnectionModelDefinitionCacheIdKey { + pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey { + ConnectionModelDefinitionCacheForKey::new(size, ttl) + } +} + +pub type ConnectionModelDefinitionDestinationKey = + ConnectionModelDefinitionCacheForKey; + +impl ConnectionModelDefinitionDestinationKey { + pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey { + ConnectionModelDefinitionCacheForKey::new(size, ttl) } } diff --git a/integrationos-cache/src/local/connection_model_schema_cache.rs b/integrationos-cache/src/local/connection_model_schema_cache.rs new file mode 100644 index 00000000..7f7c0258 --- /dev/null +++ b/integrationos-cache/src/local/connection_model_schema_cache.rs @@ -0,0 +1,73 @@ +use crate::LocalCacheExt; +use integrationos_domain::{ + connection_model_schema::ConnectionModelSchema, ApplicationError, IntegrationOSError, + MongoStore, Unit, +}; +use moka::future::Cache; +use mongodb::{bson::Document, options::FindOneOptions}; +use std::{sync::Arc, time::Duration}; + +type ConnectionModelSchemaKey = (Arc, Arc); + +#[derive(Clone)] +pub struct ConnectionModelSchemaCache { + inner: Arc>, +} + +impl ConnectionModelSchemaCache { + pub fn new(size: u64, ttl: u64) -> Self { + Self { + inner: Arc::new( + Cache::builder() + .max_capacity(size) + .time_to_live(Duration::from_secs(ttl)) + .build(), + ), + } + } + + pub async fn get_or_insert_with_filter( + &self, + key: &ConnectionModelSchemaKey, + store: MongoStore, + filter: Document, + options: Option, + ) -> Result { + match self.get(key).await? { + Some(entry) => { + tracing::debug!("Cache hit for key: {:?}", key); + Ok(entry) + } + None => { + tracing::debug!("Cache miss for key: {:?}", key); + let value = store.collection.find_one(filter, options).await?; + if let Some(value) = value { + self.set(key, &value).await?; + Ok(value) + } else { + tracing::warn!("Value with id {:?} not found", key); + Err(ApplicationError::not_found("Value not found", None)) + } + } + } + } + + pub async fn get( + &self, + key: &ConnectionModelSchemaKey, + ) -> Result, IntegrationOSError> { + self.inner.get(key).await + } + + pub async fn set( + &self, + key: &ConnectionModelSchemaKey, + value: &ConnectionModelSchema, + ) -> Result { + self.inner.set(key, value).await + } + + pub async fn remove(&self, key: &ConnectionModelSchemaKey) -> Result { + self.inner.remove(key).await + } +} diff --git a/integrationos-cache/src/local/mod.rs b/integrationos-cache/src/local/mod.rs index 730867e6..0356450e 100644 --- a/integrationos-cache/src/local/mod.rs +++ b/integrationos-cache/src/local/mod.rs @@ -1,8 +1,10 @@ pub mod connection_cache; pub mod connection_definition_cache; pub mod connection_model_definition_cache; +pub mod connection_model_schema_cache; pub mod connection_oauth_definition_cache; pub mod event_access_cache; +pub mod secrets_cache; use crate::LocalCacheExt; use integrationos_domain::{IntegrationOSError, Unit}; diff --git a/integrationos-cache/src/local/secrets_cache.rs b/integrationos-cache/src/local/secrets_cache.rs new file mode 100644 index 00000000..9ca14cec --- /dev/null +++ b/integrationos-cache/src/local/secrets_cache.rs @@ -0,0 +1,74 @@ +use crate::LocalCacheExt; +use futures::Future; +use integrationos_domain::{Connection, IntegrationOSError, InternalError, MongoStore, Unit}; +use moka::future::Cache; +use mongodb::bson::Document; +use serde_json::Value; +use std::{sync::Arc, time::Duration}; + +#[derive(Clone)] +pub struct SecretCache { + inner: Arc>, +} + +impl SecretCache { + pub fn new(size: u64, ttl: u64) -> Self { + Self { + inner: Arc::new( + Cache::builder() + .max_capacity(size) + .time_to_live(Duration::from_secs(ttl)) + .build(), + ), + } + } + + pub fn max_capacity(&self) -> u64 { + self.inner.policy().max_capacity().unwrap_or_default() + } + + pub async fn get_or_insert_with_filter( + &self, + _: &Connection, + _: MongoStore, + _: Document, + ) -> Result { + Err(InternalError::key_not_found( + "The method you are trying to use is not implemented for this cache", + None, + )) + } + + pub async fn get_or_insert_with_fn( + &self, + key: Connection, + fa: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let result = self.inner.get(&key).await; + match result { + Ok(Some(value)) => Ok(value), + Ok(None) => { + let value = fa().await?; + self.inner.set(&key, &value).await?; + Ok(value) + } + Err(e) => Err(e), + } + } + + pub async fn get(&self, key: &Connection) -> Result, IntegrationOSError> { + self.inner.get(key).await + } + + pub async fn set(&self, key: &Connection, value: &Value) -> Result { + self.inner.set(key, value).await + } + + pub async fn remove(&self, key: &Connection) -> Result { + self.inner.remove(key).await + } +} diff --git a/integrationos-event/src/config.rs b/integrationos-event/src/config.rs index c8f9432d..69dc7125 100644 --- a/integrationos-event/src/config.rs +++ b/integrationos-event/src/config.rs @@ -1,7 +1,5 @@ use envconfig::Envconfig; -use integrationos_domain::{ - cache::CacheConfig as RedisConfig, database::DatabaseConfig, secrets::SecretsConfig, -}; +use integrationos_domain::{cache::CacheConfig, database::DatabaseConfig, secrets::SecretsConfig}; use std::fmt::{Display, Formatter}; #[derive(Envconfig, Clone)] // Intentionally no Debug so secret is not printed @@ -17,9 +15,17 @@ pub struct EventCoreConfig { #[envconfig(nested = true)] pub secrets_config: SecretsConfig, #[envconfig(nested = true)] - pub redis: RedisConfig, + pub cache: CacheConfig, #[envconfig(nested = true)] pub db: DatabaseConfig, + #[envconfig(from = "CONNECTION_CACHE_TTL_SECS", default = "3600")] + pub connection_cache_ttl_secs: u64, + #[envconfig(from = "CONNECTION_MODEL_SCHEMA_TTL_SECS", default = "3600")] + pub connection_model_schema_cache_ttl_secs: u64, + #[envconfig(from = "CONNECTION_MODEL_DEFINITION_CACHE_TTL_SECS", default = "3600")] + pub connection_model_definition_cache_ttl_secs: u64, + #[envconfig(from = "SECRET_CACHE_TTL_SECS", default = "3600")] + pub secret_cache_ttl_secs: u64, } impl Display for EventCoreConfig { @@ -33,7 +39,7 @@ impl Display for EventCoreConfig { self.fetch_google_auth_token )?; write!(f, "{}", self.secrets_config)?; - write!(f, "{}", self.redis)?; + write!(f, "{}", self.cache)?; write!(f, "{}", self.db) } } diff --git a/integrationos-event/src/main.rs b/integrationos-event/src/main.rs index 9df5dd4c..c6add131 100644 --- a/integrationos-event/src/main.rs +++ b/integrationos-event/src/main.rs @@ -70,7 +70,7 @@ async fn main() -> Result<()> { }; let event_handler = - EventHandler::new(config.redis, control_store.clone(), context_store).await?; + EventHandler::new(config.cache, control_store.clone(), context_store).await?; info!("Listening for events on redis..."); let sync_pair = Arc::new((Mutex::new(0u64), Condvar::new())); diff --git a/integrationos-event/src/mongo_control_data_store.rs b/integrationos-event/src/mongo_control_data_store.rs index 43aa59cc..a34886c3 100644 --- a/integrationos-event/src/mongo_control_data_store.rs +++ b/integrationos-event/src/mongo_control_data_store.rs @@ -18,7 +18,7 @@ use integrationos_domain::{ middleware::Middleware, Connection, Event, Pipeline, Store, }; -use integrationos_unified::unified::UnifiedDestination; +use integrationos_unified::unified::{UnifiedCacheTTLs, UnifiedDestination}; use moka::future::Cache; use mongodb::{options::ClientOptions, Client}; use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; @@ -111,6 +111,14 @@ impl MongoControlDataStore { config.db.clone(), config.cache_size, secrets_client, + UnifiedCacheTTLs { + connection_cache_ttl_secs: config.connection_cache_ttl_secs, + connection_model_definition_cache_ttl_secs: config + .connection_model_definition_cache_ttl_secs, + connection_model_schema_cache_ttl_secs: config + .connection_model_schema_cache_ttl_secs, + secret_cache_ttl_secs: config.secret_cache_ttl_secs, + }, ) .await?, }) diff --git a/integrationos-unified/Cargo.toml b/integrationos-unified/Cargo.toml index 2a867c1a..e7713eda 100644 --- a/integrationos-unified/Cargo.toml +++ b/integrationos-unified/Cargo.toml @@ -8,13 +8,13 @@ edition = "2021" jsonpath_lib = "0.3.0" bson = "2.9.0" chrono = { version = "0.4.32", features = ["serde"] } +integrationos-cache = { path = "../integrationos-cache" } integrationos-domain = { path = "../integrationos-domain" } futures = "0.3.30" handlebars = { version = "4.4.0" } http = "1.1.0" http-serde-ext = "1.0.2" js-sandbox-ios = "0.1.0" -moka.workspace = true mongodb = "2.8.0" reqwest = { version = "0.12.3", features = [ "json", diff --git a/integrationos-unified/src/unified.rs b/integrationos-unified/src/unified.rs index ea578634..229ecb18 100644 --- a/integrationos-unified/src/unified.rs +++ b/integrationos-unified/src/unified.rs @@ -10,6 +10,11 @@ use chrono::Utc; use futures::{future::join_all, join, FutureExt}; use handlebars::Handlebars; use http::{HeaderMap, HeaderName, HeaderValue, Response, StatusCode}; +use integrationos_cache::local::{ + connection_cache::ConnectionCacheArcStrKey, + connection_model_definition_cache::ConnectionModelDefinitionDestinationKey, + connection_model_schema_cache::ConnectionModelSchemaCache, secrets_cache::SecretCache, +}; use integrationos_domain::{ api_model_config::{ModelPaths, RequestModelPaths, ResponseModelPaths}, client::caller_client::CallerClient, @@ -27,7 +32,6 @@ use integrationos_domain::{ ApplicationError, Connection, ErrorMeta, IntegrationOSError, Store, }; use js_sandbox_ios::Script; -use moka::future::Cache; use mongodb::{ options::{Collation, CollationStrength, FindOneOptions}, Client, @@ -47,38 +51,51 @@ thread_local! { #[derive(Clone)] pub struct UnifiedDestination { - pub connections_cache: Cache, Arc>, + pub connections_cache: ConnectionCacheArcStrKey, pub connections_store: MongoStore, - pub connection_model_definitions_cache: Cache>, - pub connection_model_definitions_destination_cache: - Cache>, + pub connection_model_definitions_cache: ConnectionModelDefinitionDestinationKey, pub connection_model_definitions_store: MongoStore, - pub connection_model_schemas_cache: Cache<(Arc, Arc), Arc>, + pub connection_model_schemas_cache: ConnectionModelSchemaCache, pub connection_model_schemas_store: MongoStore, pub secrets_client: Arc, - pub secrets_cache: Cache>, + pub secrets_cache: SecretCache, pub http_client: reqwest::Client, pub renderer: Option>>>, } +pub struct UnifiedCacheTTLs { + pub connection_cache_ttl_secs: u64, + pub connection_model_definition_cache_ttl_secs: u64, + pub connection_model_schema_cache_ttl_secs: u64, + pub secret_cache_ttl_secs: u64, +} + impl UnifiedDestination { + // TODO: Pass this ttls values as parameters pub async fn new( - config: DatabaseConfig, + db_config: DatabaseConfig, cache_size: u64, secrets_client: Arc, + cache_ttls: UnifiedCacheTTLs, ) -> Result { let http_client = reqwest::Client::new(); - let connections_cache = Cache::new(cache_size); - let connection_model_definitions_cache = Cache::new(cache_size); - let connection_model_definitions_destination_cache = Cache::new(cache_size); - let connection_model_schemas_cache = Cache::new(cache_size); - let secrets_cache = Cache::new(cache_size); + let connections_cache = + ConnectionCacheArcStrKey::new(cache_size, cache_ttls.connection_cache_ttl_secs); + let connection_model_definitions_cache = ConnectionModelDefinitionDestinationKey::create( + cache_size, + cache_ttls.connection_model_definition_cache_ttl_secs, + ); + let connection_model_schemas_cache = ConnectionModelSchemaCache::new( + cache_size, + cache_ttls.connection_model_schema_cache_ttl_secs, + ); + let secrets_cache = SecretCache::new(cache_size, cache_ttls.secret_cache_ttl_secs); - let client = Client::with_uri_str(&config.control_db_url) + let client = Client::with_uri_str(&db_config.control_db_url) .await .map_err(|e| InternalError::connection_error(&e.to_string(), None))?; - let db = client.database(&config.control_db_name); + let db = client.database(&db_config.control_db_name); let connections_store = MongoStore::new(&db, &Store::Connections).await?; let connection_model_definitions_store = @@ -90,7 +107,6 @@ impl UnifiedDestination { connections_cache, connections_store, connection_model_definitions_cache, - connection_model_definitions_destination_cache, connection_model_definitions_store, connection_model_schemas_cache, connection_model_schemas_store, @@ -178,10 +194,10 @@ impl UnifiedDestination { pub async fn execute_model_definition( &self, - config: &Arc, + config: &ConnectionModelDefinition, headers: HeaderMap, query_params: &HashMap, - secret: &Arc, + secret: &Value, context: Option>, ) -> Result { let template_name = config.id.to_string(); @@ -251,30 +267,32 @@ impl UnifiedDestination { let config_fut = self .connection_model_definitions_cache - .try_get_with_by_ref(&key, async { + .get_or_insert_with_fn(key.clone(), || async { match self.get_connection_model_definition(&key).await { - Ok(Some(c)) => Ok(Arc::new(c)), + Ok(Some(c)) => Ok(c), Ok(None) => Err(InternalError::key_not_found("model definition", None)), Err(e) => Err(InternalError::connection_error(e.message().as_ref(), None)), } }); - let secret_fut = self.secrets_cache.try_get_with_by_ref(&connection, async { - let secret_request = GetSecretRequest { - buildable_id: connection.ownership.id.to_string(), - id: connection.secrets_service_id.clone(), - }; - match self - .secrets_client - .decrypt(&secret_request) - .map(|v| Some(v).transpose()) - .await - { - Ok(Some(c)) => Ok(Arc::new(c)), - Ok(None) => Err(InternalError::key_not_found("secret", None)), - Err(e) => Err(InternalError::connection_error(e.message().as_ref(), None)), - } - }); + let secret_fut = + self.secrets_cache + .get_or_insert_with_fn(connection.as_ref().clone(), || async { + let secret_request = GetSecretRequest { + buildable_id: connection.ownership.id.to_string(), + id: connection.secrets_service_id.clone(), + }; + match self + .secrets_client + .decrypt(&secret_request) + .map(|v| Some(v).transpose()) + .await + { + Ok(Some(c)) => Ok(c), + Ok(None) => Err(InternalError::key_not_found("secret", None)), + Err(e) => Err(InternalError::connection_error(e.message().as_ref(), None)), + } + }); let Action::Unified { action: _, @@ -289,33 +307,26 @@ impl UnifiedDestination { }; let schema_key = (connection.platform.clone(), name.clone()); - let schema_fut = - self.connection_model_schemas_cache - .try_get_with_by_ref(&schema_key, async { - match self - .connection_model_schemas_store - .collection - .find_one( - doc! { - "connectionPlatform": connection.platform.as_ref(), - "mapping.commonModelName": name.as_ref(), - }, - FindOneOptions::builder() - .collation(Some( - Collation::builder() - .strength(CollationStrength::Secondary) - .locale("en") - .build(), - )) + let schema_fut = self + .connection_model_schemas_cache + .get_or_insert_with_filter( + &schema_key, + self.connection_model_schemas_store.clone(), + doc! { + "connectionPlatform": connection.platform.as_ref(), + "mapping.commonModelName": name.as_ref(), + }, + Some( + FindOneOptions::builder() + .collation(Some( + Collation::builder() + .strength(CollationStrength::Secondary) + .locale("en") .build(), - ) - .await - { - Ok(Some(c)) => Ok(Arc::new(c)), - Ok(None) => Err(InternalError::key_not_found("model schema", None)), - Err(e) => Err(InternalError::connection_error(&e.to_string(), None)), - } - }); + )) + .build(), + ), + ); let join_result = join!(config_fut, secret_fut, schema_fut); @@ -335,22 +346,22 @@ impl UnifiedDestination { id: schema_id, mapping, .. - } = cms.as_ref(); + } = cms; if let Some(id) = id { - let secret = Arc::make_mut(&mut secret); + let secret = &mut secret; if let Value::Object(sec) = secret { const ID: &str = "id"; sec.insert(ID.to_string(), Value::String(id.to_string())); } } - let crud_script_namespace = if self.secrets_cache.policy().max_capacity() == Some(0) { + let crud_script_namespace = if self.secrets_cache.max_capacity() == 0 { "$".to_string() + &uuid::Uuid::new_v4().simple().to_string() } else { config.id.to_string().replace([':', '-'], "_") }; - let schema_script_namespace = if self.secrets_cache.policy().max_capacity() == Some(0) { + let schema_script_namespace = if self.secrets_cache.max_capacity() == 0 { "$".to_string() + &uuid::Uuid::new_v4().simple().to_string() } else { schema_id.to_string().replace([':', '-'], "_") @@ -485,7 +496,7 @@ impl UnifiedDestination { query_params = res.query_params.unwrap_or_default(); - let secret = Arc::make_mut(&mut secret); + let secret = &mut secret; if let Value::Object(ref mut sec) = secret { if let Some(path_params) = res.path_params { sec.extend(path_params.into_iter().map(|(a, b)| (a, Value::String(b)))); @@ -943,25 +954,15 @@ impl UnifiedDestination { let connection = if let Some(connection) = connection { connection } else { - self.connections_cache - .try_get_with_by_ref(&destination.connection_key, async { - match self - .connections_store - .get_one(doc! { "key": destination.connection_key.as_ref() }) - .await - { - Ok(Some(c)) => Ok(Arc::new(c)), - Ok(None) => Err(InternalError::key_not_found("Connection", None)), - Err(e) => Err(InternalError::connection_error(e.message().as_ref(), None)), - } - }) - .await - .map_err(|e| { - InternalError::connection_error( - &e.to_string(), - Some(&destination.connection_key.clone()), + Arc::new( + self.connections_cache + .get_or_insert_with_filter( + destination.connection_key.clone(), + self.connections_store.clone(), + doc! { "key": destination.connection_key.as_ref() }, ) - })? + .await?, + ) }; let config = match self.get_connection_model_definition(destination).await { @@ -982,7 +983,7 @@ impl UnifiedDestination { let secret = self .secrets_cache - .try_get_with_by_ref(&connection, async { + .get_or_insert_with_fn(connection.as_ref().clone(), || async { let secret_request = GetSecretRequest { buildable_id: connection.ownership.id.to_string(), id: connection.secrets_service_id.clone(), @@ -993,7 +994,7 @@ impl UnifiedDestination { .map(|v| Some(v).transpose()) .await { - Ok(Some(c)) => Ok(Arc::new(c)), + Ok(Some(c)) => Ok(c), Ok(None) => Err(InternalError::key_not_found("Secrets", None)), Err(e) => Err(InternalError::connection_error(e.message().as_ref(), None)), }