Skip to content

Commit

Permalink
feat: utilizing integrationos cache wrapper on unified api (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Jun 3, 2024
1 parent c22d1af commit 3a7716d
Show file tree
Hide file tree
Showing 16 changed files with 379 additions and 145 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions integrationos-api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use envconfig::Envconfig;
use integrationos_domain::cache::CacheConfig as RedisConfig;
use integrationos_domain::cache::CacheConfig;
use integrationos_domain::{
database::DatabaseConfig, openai::OpenAiConfig, secrets::SecretsConfig,
};
Expand Down Expand Up @@ -32,6 +32,12 @@ pub struct Config {
pub connection_definition_cache_ttl_secs: u64,
#[envconfig(from = "CONNECTION_OAUTH_DEFINITION_CACHE_TTL_SECS", default = "120")]
pub connection_oauth_definition_cache_ttl_secs: u64,
#[envconfig(from = "CONNECTION_MODEL_SCHEMA_TTL_SECS", default = "3600")]
pub connection_model_schema_cache_ttl_secs: u64,
#[envconfig(from = "CONNECTION_MODEL_DEFINITION_CACHE_TTL_SECS", default = "3600")]
pub connection_model_definition_cache_ttl_secs: u64,
#[envconfig(from = "SECRET_CACHE_TTL_SECS", default = "3600")]
pub secret_cache_ttl_secs: u64,
#[envconfig(
from = "EVENT_ACCESS_PASSWORD",
default = "32KFFT_i4UpkJmyPwY2TGzgHpxfXs7zS"
Expand Down Expand Up @@ -77,7 +83,7 @@ pub struct Config {
#[envconfig(nested = true)]
pub openai_config: OpenAiConfig,
#[envconfig(nested = true)]
pub redis_config: RedisConfig,
pub cache_config: CacheConfig,
#[envconfig(from = "RATE_LIMIT_ENABLED", default = "true")]
pub rate_limit_enabled: bool,
}
Expand Down Expand Up @@ -140,7 +146,7 @@ impl Display for Config {
writeln!(f, "{}", self.headers)?;
writeln!(f, "{}", self.db_config)?;
writeln!(f, "{}", self.openai_config)?;
writeln!(f, "{}", self.redis_config)?;
writeln!(f, "{}", self.cache_config)?;
writeln!(f, "RATE_LIMIT_ENABLED: {}", self.rate_limit_enabled)
}
}
Expand Down
6 changes: 3 additions & 3 deletions integrationos-api/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::{
};
use bson::{doc, SerializerOptions};
use http::{HeaderMap, HeaderValue, StatusCode};
use integrationos_cache::local::connection_cache::ConnectionCache;
use integrationos_cache::local::connection_cache::ConnectionCacheArcStrHeaderKey;
use integrationos_domain::{
algebra::MongoStore, event_access::EventAccess, ApplicationError, Connection,
IntegrationOSError, InternalError, OAuth, Store,
Expand Down Expand Up @@ -337,11 +337,11 @@ async fn get_connection(
access: &EventAccess,
connection_key: &HeaderValue,
stores: &AppStores,
cache: &ConnectionCache,
cache: &ConnectionCacheArcStrHeaderKey,
) -> Result<Arc<Connection>, IntegrationOSError> {
let connection = cache
.get_or_insert_with_filter(
&(access.ownership.id.clone(), connection_key.clone()),
(access.ownership.id.clone(), connection_key.clone()),
stores.connection.clone(),
doc! {
"key": connection_key.to_str().map_err(|_| {
Expand Down
4 changes: 2 additions & 2 deletions integrationos-api/src/middleware/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ impl RateLimiter {
return Err(anyhow::anyhow!("Rate limiting is disabled"));
};

let mut redis = RedisCache::new(&state.config.redis_config)
let mut redis = RedisCache::new(&state.config.cache_config)
.await
.with_context(|| "Could not connect to redis")?;

let (tx, mut rx) = channel::<(Arc<str>, oneshot::Sender<u64>)>(1024);

let throughput_key = state.config.redis_config.api_throughput_key.clone();
let throughput_key = state.config.cache_config.api_throughput_key.clone();

tokio::spawn(async move {
while let Some((id, tx)) = rx.recv().await {
Expand Down
21 changes: 16 additions & 5 deletions integrationos-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
use anyhow::{anyhow, Context, Result};
use axum::Router;
use integrationos_cache::local::{
connection_cache::ConnectionCache, connection_definition_cache::ConnectionDefinitionCache,
connection_cache::ConnectionCacheArcStrHeaderKey,
connection_definition_cache::ConnectionDefinitionCache,
connection_oauth_definition_cache::ConnectionOAuthDefinitionCache,
event_access_cache::EventAccessCache,
};
Expand All @@ -26,7 +27,7 @@ use integrationos_domain::{
stage::Stage,
Connection, Event, Pipeline, PlatformData, Store, Transaction,
};
use integrationos_unified::unified::UnifiedDestination;
use integrationos_unified::unified::{UnifiedCacheTTLs, UnifiedDestination};
use mongodb::{options::UpdateOptions, Client, Database};
use segment::{AutoBatcher, Batcher, HttpClient};
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -64,7 +65,7 @@ pub struct AppState {
pub openapi_data: OpenAPIData,
pub http_client: reqwest::Client,
pub event_access_cache: EventAccessCache,
pub connections_cache: ConnectionCache,
pub connections_cache: ConnectionCacheArcStrHeaderKey,
pub connection_definitions_cache: ConnectionDefinitionCache,
pub connection_oauth_definitions_cache: ConnectionOAuthDefinitionCache,
pub secrets_client: Arc<dyn CryptoExt + Sync + Send>,
Expand Down Expand Up @@ -117,6 +118,14 @@ impl Server {
config.db_config.clone(),
config.cache_size,
secrets_client.clone(),
UnifiedCacheTTLs {
connection_cache_ttl_secs: config.connection_cache_ttl_secs,
connection_model_schema_cache_ttl_secs: config
.connection_model_schema_cache_ttl_secs,
connection_model_definition_cache_ttl_secs: config
.connection_model_definition_cache_ttl_secs,
secret_cache_ttl_secs: config.secret_cache_ttl_secs,
},
)
.await
.with_context(|| "Could not initialize extractor caller")?;
Expand Down Expand Up @@ -146,8 +155,10 @@ impl Server {

let event_access_cache =
EventAccessCache::new(config.cache_size, config.access_key_cache_ttl_secs);
let connections_cache =
ConnectionCache::new(config.cache_size, config.connection_cache_ttl_secs);
let connections_cache = ConnectionCacheArcStrHeaderKey::create(
config.cache_size,
config.connection_cache_ttl_secs,
);
let connection_definitions_cache = ConnectionDefinitionCache::new(
config.cache_size,
config.connection_definition_cache_ttl_secs,
Expand Down
4 changes: 2 additions & 2 deletions integrationos-api/tests/api_tests/test_server/test_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl TestCore {
let mut config = EventCoreConfig::init_from_hashmap(&HashMap::from([])).unwrap();

config.db = api_config.db_config.clone();
config.redis = gateway_config.redis.clone();
config.cache = gateway_config.redis.clone();

let control_store = Arc::new(
MongoControlDataStore::new(&config, secrets_client)
Expand All @@ -55,7 +55,7 @@ impl TestCore {
control_data_store: control_store.clone(),
};

let event_handler = EventHandler::new(config.redis.clone(), control_store, context_store)
let event_handler = EventHandler::new(config.cache.clone(), control_store, context_store)
.await
.unwrap();

Expand Down
47 changes: 29 additions & 18 deletions integrationos-cache/src/local/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use http::HeaderValue;
use integrationos_domain::{Connection, IntegrationOSError, MongoStore, Unit};
use moka::future::Cache;
use mongodb::bson::Document;
use std::fmt::Debug;
use std::hash::Hash;
use std::{sync::Arc, time::Duration};

#[derive(Clone)]
pub struct ConnectionCache {
inner: Arc<Cache<(Arc<str>, HeaderValue), Connection>>,
pub struct ConnectionCacheForKey<K: Clone + Send + Sync + Eq + Hash + Debug + 'static> {
inner: Arc<Cache<K, Connection>>,
}

impl ConnectionCache {
impl<K: Clone + Send + Sync + Eq + Hash + Debug + 'static> ConnectionCacheForKey<K> {
pub fn new(size: u64, ttl: u64) -> Self {
Self {
inner: Arc::new(
Expand All @@ -24,31 +26,40 @@ impl ConnectionCache {

pub async fn get_or_insert_with_filter(
&self,
key: &(Arc<str>, HeaderValue),
key: K,
store: MongoStore<Connection>,
filter: Document,
) -> Result<Connection, IntegrationOSError> {
self.inner
.get_or_insert_with_filter(key, store, filter)
.get_or_insert_with_filter(&key, store, filter)
.await
}

pub async fn get(
&self,
key: &(Arc<str>, HeaderValue),
) -> Result<Option<Connection>, IntegrationOSError> {
self.inner.get(key).await
pub async fn get(&self, key: K) -> Result<Option<Connection>, IntegrationOSError> {
self.inner.get(&key).await
}

pub async fn set(
&self,
key: &(Arc<str>, HeaderValue),
value: &Connection,
) -> Result<Unit, IntegrationOSError> {
self.inner.set(key, value).await
pub async fn set(&self, key: K, value: &Connection) -> Result<Unit, IntegrationOSError> {
self.inner.set(&key, value).await
}

pub async fn remove(&self, key: K) -> Result<Unit, IntegrationOSError> {
self.inner.remove(&key).await
}
}

pub type ConnectionCacheArcStrKey = ConnectionCacheForKey<Arc<str>>;

impl ConnectionCacheArcStrKey {
pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey<Arc<str>> {
ConnectionCacheForKey::new(size, ttl)
}
}

pub type ConnectionCacheArcStrHeaderKey = ConnectionCacheForKey<(Arc<str>, HeaderValue)>;

pub async fn remove(&self, key: &(Arc<str>, HeaderValue)) -> Result<Unit, IntegrationOSError> {
self.inner.remove(key).await
impl ConnectionCacheArcStrHeaderKey {
pub fn create(size: u64, ttl: u64) -> ConnectionCacheForKey<(Arc<str>, HeaderValue)> {
ConnectionCacheForKey::new(size, ttl)
}
}
74 changes: 58 additions & 16 deletions integrationos-cache/src/local/connection_model_definition_cache.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,98 @@
use crate::LocalCacheExt;
use futures::Future;
use integrationos_domain::{
connection_model_definition::ConnectionModelDefinition, Id, IntegrationOSError, MongoStore,
Unit,
connection_model_definition::ConnectionModelDefinition, destination::Destination, Id,
IntegrationOSError, MongoStore, Unit,
};
use moka::future::Cache;
use mongodb::bson::Document;
use std::{sync::Arc, time::Duration};
use std::{fmt::Debug, hash::Hash, sync::Arc, time::Duration};

#[derive(Clone)]
pub struct ConnectionModelDefinitionCache {
inner: Arc<Cache<Id, ConnectionModelDefinition>>,
pub struct ConnectionModelDefinitionCacheForKey<
K: Clone + Send + Sync + Eq + Hash + Debug + 'static,
> {
inner: Arc<Cache<K, ConnectionModelDefinition>>,
}

impl ConnectionModelDefinitionCache {
pub fn new(size: u64) -> Self {
impl<K: Clone + Send + Sync + Eq + Hash + Debug + 'static> ConnectionModelDefinitionCacheForKey<K> {
pub fn new(size: u64, ttl: u64) -> Self {
Self {
inner: Arc::new(
Cache::builder()
.max_capacity(size)
.time_to_live(Duration::from_secs(120))
.time_to_live(Duration::from_secs(ttl))
.build(),
),
}
}

pub async fn get_or_insert_with_filter(
&self,
key: &Id,
key: K,
store: MongoStore<ConnectionModelDefinition>,
filter: Document,
) -> Result<ConnectionModelDefinition, IntegrationOSError> {
self.inner
.get_or_insert_with_filter(key, store, filter)
.get_or_insert_with_filter(&key, store, filter)
.await
}

pub async fn get_or_insert_with_fn<F, Fut>(
&self,
key: K,
fa: F,
) -> Result<ConnectionModelDefinition, IntegrationOSError>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<ConnectionModelDefinition, IntegrationOSError>>,
{
let result = self.inner.get(&key).await;
match result {
Ok(Some(value)) => Ok(value),
Ok(None) => {
let value = fa().await?;
self.inner.set(&key, &value).await?;
Ok(value)
}
Err(e) => Err(e),
}
}

pub async fn get(
&self,
key: &Id,
key: K,
) -> Result<Option<ConnectionModelDefinition>, IntegrationOSError> {
self.inner.get(key).await
self.inner.get(&key).await
}

pub async fn set(
&self,
key: &Id,
key: K,
value: &ConnectionModelDefinition,
) -> Result<Unit, IntegrationOSError> {
self.inner.set(key, value).await
self.inner.set(&key, value).await
}

pub async fn remove(&self, key: &Id) -> Result<Unit, IntegrationOSError> {
self.inner.remove(key).await
pub async fn remove(&self, key: K) -> Result<Unit, IntegrationOSError> {
self.inner.remove(&key).await
}
}

#[derive(Clone)]
pub struct ConnectionModelDefinitionCacheIdKey;

impl ConnectionModelDefinitionCacheIdKey {
pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey<Id> {
ConnectionModelDefinitionCacheForKey::new(size, ttl)
}
}

pub type ConnectionModelDefinitionDestinationKey =
ConnectionModelDefinitionCacheForKey<Destination>;

impl ConnectionModelDefinitionDestinationKey {
pub fn create(size: u64, ttl: u64) -> ConnectionModelDefinitionCacheForKey<Destination> {
ConnectionModelDefinitionCacheForKey::new(size, ttl)
}
}
Loading

0 comments on commit 3a7716d

Please sign in to comment.