From 9432b59839580d1ec23cd81a4475db37daa1de63 Mon Sep 17 00:00:00 2001 From: Samuel <39674930+samgj18@users.noreply.github.com> Date: Fri, 12 Apr 2024 15:55:36 +0100 Subject: [PATCH] Migration to IOS v1.0.0 (#5) * Migration to IOS v1.0.0 * Removing failed imports on test --- Cargo.lock | 35 ++----- Cargo.toml | 7 +- api/Cargo.toml | 2 +- api/src/config.rs | 2 +- api/src/endpoints/common_enum.rs | 4 +- api/src/endpoints/common_model.rs | 7 +- api/src/endpoints/connection.rs | 28 +++--- api/src/endpoints/connection_definition.rs | 7 +- .../endpoints/connection_model_definition.rs | 5 +- api/src/endpoints/connection_model_schema.rs | 7 +- .../endpoints/connection_oauth_definition.rs | 17 ++-- api/src/endpoints/event_access.rs | 27 +++--- api/src/endpoints/events.rs | 16 ++-- api/src/endpoints/mod.rs | 77 +++++++-------- api/src/endpoints/oauth.rs | 16 +--- api/src/endpoints/openapi.rs | 15 ++- api/src/endpoints/passthrough.rs | 27 +++--- api/src/endpoints/pipeline.rs | 7 +- api/src/endpoints/transactions.rs | 16 ++-- api/src/endpoints/unified.rs | 46 ++++----- api/src/middleware/auth.rs | 4 +- api/src/middleware/rate_limiter.rs | 6 +- api/src/server.rs | 93 ++++++++++--------- api/tests/api_tests/test_server/mod.rs | 22 ++--- event-core/Cargo.toml | 3 +- event-core/src/config.rs | 5 +- event-core/src/dispatcher.rs | 16 ++-- event-core/src/event_handler.rs | 20 ++-- event-core/src/mocks/mock_context_store.rs | 10 +- event-core/src/mocks/mock_secret_service.rs | 4 +- event-core/src/mongo_context_store.rs | 8 +- event-core/src/mongo_control_data_store.rs | 36 +++---- event-core/src/store.rs | 6 +- event-core/tests/mock_destination.rs | 6 +- event-core/tests/mock_storage.rs | 49 +++++----- gateway/Cargo.toml | 4 +- gateway/src/config.rs | 10 +- gateway/src/finalizer.rs | 18 ++-- google-token-fetcher/Cargo.toml | 11 --- google-token-fetcher/src/lib.rs | 39 -------- redis-retry/Cargo.toml | 13 --- redis-retry/src/config.rs | 38 -------- redis-retry/src/lib.rs | 81 ---------------- watchdog/Cargo.toml | 17 ++-- watchdog/src/config.rs | 8 +- watchdog/src/main.rs | 18 ++-- 46 files changed, 344 insertions(+), 569 deletions(-) delete mode 100644 google-token-fetcher/Cargo.toml delete mode 100644 google-token-fetcher/src/lib.rs delete mode 100644 redis-retry/Cargo.toml delete mode 100644 redis-retry/src/config.rs delete mode 100644 redis-retry/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 219c6e84..e1b15af1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,7 +112,7 @@ dependencies = [ "num_cpus", "openapiv3", "rand", - "redis-retry", + "redis", "reqwest", "segment", "semver 1.0.22", @@ -976,7 +976,6 @@ dependencies = [ "envconfig", "fake", "futures", - "google-token-fetcher", "handlebars", "http", "integrationos-domain", @@ -986,7 +985,7 @@ dependencies = [ "mockito", "moka", "mongodb", - "redis-retry", + "redis", "reqwest", "serde", "serde_json", @@ -1171,7 +1170,7 @@ dependencies = [ "integrationos-domain", "moka", "mongodb", - "redis-retry", + "redis", "serde", "serde_json", "tokio", @@ -1214,15 +1213,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "google-token-fetcher" -version = "0.1.0" -dependencies = [ - "anyhow", - "reqwest", - "serde", -] - [[package]] name = "h2" version = "0.3.24" @@ -1520,9 +1510,9 @@ dependencies = [ [[package]] name = "integrationos-domain" -version = "0.1.5" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddff1b83ab196f76666b13730747a8e9fc6d1953eb0f3fc70b8afa00ba246f0c" +checksum = "54b659a2663d9b21703fcac95895c05104289b25da1efe95de0c9a6c5ffe225b" dependencies = [ "aes", "anyhow", @@ -1550,6 +1540,7 @@ dependencies = [ "pin-project", "prost", "rand", + "redis", "reqwest", "semver 1.0.22", "serde", @@ -1560,6 +1551,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-subscriber", "uuid", ] @@ -2452,17 +2444,6 @@ dependencies = [ "url", ] -[[package]] -name = "redis-retry" -version = "0.1.0" -dependencies = [ - "anyhow", - "envconfig", - "futures-util", - "redis", - "tracing", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -3770,7 +3751,7 @@ dependencies = [ "futures", "integrationos-domain", "mongodb", - "redis-retry", + "redis", "serde_json", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 625f6147..320b1f13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,9 +6,7 @@ members = [ "api", "event-core", "gateway", - "google-token-fetcher", - "redis-retry", - "watchdog", + "watchdog" ] [workspace.dependencies] @@ -38,7 +36,7 @@ futures-util = "0.3.28" handlebars = "4.4.0" http = "0.2.9" http-serde-ext = "0.1.8" -integrationos-domain = "0.1.5" +integrationos-domain = "1.0.0" js-sandbox-ios = "0.1.0" jsonpath_lib = "0.3.0" jsonwebtoken = "8.3.0" @@ -48,6 +46,7 @@ mongodb = "2.7.0" once_cell = "1.18.0" openapiv3 = { version = "2.0.0", features = ["skip_serializing_defaults"] } rand = "0.8.5" +redis = { version = "0.23.3", features = ["connection-manager", "tokio-comp"] } regex = "1.10.2" reqwest = { version = "0.11.22", features = [ "json", diff --git a/api/Cargo.toml b/api/Cargo.toml index 1897ac08..f1017211 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -33,7 +33,7 @@ mongodb.workspace = true num_cpus = "1" openapiv3.workspace = true rand.workspace = true -redis-retry = { path = "../redis-retry" } +redis.workspace = true reqwest.workspace = true segment = "0.2.3" semver.workspace = true diff --git a/api/src/config.rs b/api/src/config.rs index 3f37630b..496b2598 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -4,10 +4,10 @@ use std::{ }; use envconfig::Envconfig; +use integrationos_domain::cache::CacheConfig as RedisConfig; use integrationos_domain::common::{ claude::ClaudeConfig, database::DatabaseConfig, openai::OpenAiConfig, secrets::SecretsConfig, }; -use redis_retry::Config as RedisConfig; #[derive(Envconfig, Clone)] pub struct Config { diff --git a/api/src/endpoints/common_enum.rs b/api/src/endpoints/common_enum.rs index 3eb6dd2f..59cf5c71 100644 --- a/api/src/endpoints/common_enum.rs +++ b/api/src/endpoints/common_enum.rs @@ -1,12 +1,10 @@ use super::{ApiError, ReadResponse}; use crate::{internal_server_error, server::AppState, util::shape_mongo_filter}; - use axum::{ extract::{Query, State}, Json, }; -use integrationos_domain::{algebra::adapter::StoreAdapter, common_model::CommonEnum}; - +use integrationos_domain::{algebra::StoreExt, common_model::CommonEnum}; use shape_mongo_filter::DELETED_STR; use std::{collections::BTreeMap, sync::Arc}; use tokio::try_join; diff --git a/api/src/endpoints/common_model.rs b/api/src/endpoints/common_model.rs index a5490c0b..f663edbe 100644 --- a/api/src/endpoints/common_model.rs +++ b/api/src/endpoints/common_model.rs @@ -10,13 +10,12 @@ use axum::{ Router, }; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, api_model_config::Lang, common::{ common_model::{CommonModel, Field}, event_access::EventAccess, json_schema::JsonSchema, - mongo::MongoDbStore, }, id::{prefix::IdPrefix, Id}, IntegrationOSError, @@ -113,7 +112,7 @@ impl CrudRequest for CreateRequest { record.sample = self.sample; } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.common_model.clone() } } @@ -161,7 +160,7 @@ async fn as_json_schema(path: Path, state: State>) -> ApiResul async fn update_interface( interface: HashMap, record: &CommonModel, - cm_store: &MongoDbStore, + cm_store: &MongoStore, ) -> Result<(), IntegrationOSError> { match bson::to_bson(&interface) { Ok(interface) => { diff --git a/api/src/endpoints/connection.rs b/api/src/endpoints/connection.rs index 00763fe2..a4c64383 100644 --- a/api/src/endpoints/connection.rs +++ b/api/src/endpoints/connection.rs @@ -1,5 +1,11 @@ -use std::{collections::HashMap, sync::Arc}; - +use super::{delete, read, CrudRequest}; +use crate::{ + api_payloads::{DeleteResponse, ErrorResponse, UpdateResponse}, + bad_request, + endpoints::event_access::{generate_event_access, CreateEventAccessPayloadWithOwnership}, + internal_server_error, not_found, + server::{AppState, AppStores}, +}; use anyhow::{bail, Result}; use axum::{ extract::{Path, State}, @@ -11,11 +17,10 @@ use chrono::Utc; use convert_case::{Case, Casing}; use http::HeaderMap; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ connection_definition::ConnectionDefinition, event_access::EventAccess, - mongo::MongoDbStore, record_metadata::RecordMetadata, settings::Settings, Connection, - Throughput, + record_metadata::RecordMetadata, settings::Settings, Connection, Throughput, }, id::{prefix::IdPrefix, Id}, }; @@ -23,19 +28,10 @@ use mongodb::bson::doc; use mongodb::bson::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value; +use std::{collections::HashMap, sync::Arc}; use tracing::error; use validator::Validate; -use crate::{ - api_payloads::{DeleteResponse, ErrorResponse, UpdateResponse}, - bad_request, - endpoints::event_access::{generate_event_access, CreateEventAccessPayloadWithOwnership}, - internal_server_error, not_found, - server::{AppState, AppStores}, -}; - -use super::{delete, read, CrudRequest}; - pub fn get_router() -> Router> { Router::new() .route("/", post(create_connection)) @@ -107,7 +103,7 @@ impl CrudRequest for CreateConnectionPayload { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.connection } diff --git a/api/src/endpoints/connection_definition.rs b/api/src/endpoints/connection_definition.rs index 87866f84..d6ae8b09 100644 --- a/api/src/endpoints/connection_definition.rs +++ b/api/src/endpoints/connection_definition.rs @@ -11,7 +11,7 @@ use axum::{ Json, Router, }; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ api_model_config::AuthMethod, connection_definition::{ @@ -19,7 +19,6 @@ use integrationos_domain::{ FormDataItem, Frontend, Paths, Spec, }, event_access::EventAccess, - mongo::MongoDbStore, record_metadata::RecordMetadata, settings::Settings, }, @@ -333,14 +332,12 @@ impl CrudRequest for CreateRequest { record.record_metadata.active = self.active; } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.connection_config } } impl CachedRequest for CreateRequest { - type Output = ConnectionDefinition; - fn get_cache( state: Arc, ) -> Arc>, Arc>>> { diff --git a/api/src/endpoints/connection_model_definition.rs b/api/src/endpoints/connection_model_definition.rs index 2dc5a73d..5b89250d 100644 --- a/api/src/endpoints/connection_model_definition.rs +++ b/api/src/endpoints/connection_model_definition.rs @@ -15,7 +15,7 @@ use bson::SerializerOptions; use chrono::Utc; use http::HeaderMap; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ api_model_config::{ ApiModelConfig, AuthMethod, ModelPaths, ResponseBody, SamplesInput, SchemasInput, @@ -25,7 +25,6 @@ use integrationos_domain::{ TestConnection, TestConnectionState, }, event_access::EventAccess, - mongo::MongoDbStore, }, get_secret_request::GetSecretRequest, id::{prefix::IdPrefix, Id}, @@ -393,7 +392,7 @@ impl CrudRequest for CreateRequest { record.record_metadata.version = self.version; } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.model_config.clone() } } diff --git a/api/src/endpoints/connection_model_schema.rs b/api/src/endpoints/connection_model_schema.rs index ad2c174d..aaf71a75 100644 --- a/api/src/endpoints/connection_model_schema.rs +++ b/api/src/endpoints/connection_model_schema.rs @@ -14,14 +14,13 @@ use axum::{ }; use http::StatusCode; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ connection_model_schema::{ ConnectionModelSchema, Mappings, PublicConnectionModelSchema, SchemaPaths, }, event_access::EventAccess, json_schema::JsonSchema, - mongo::MongoDbStore, }, id::{prefix::IdPrefix, Id}, }; @@ -161,7 +160,7 @@ impl CrudRequest for PublicGetConnectionModelSchema { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.public_model_schema.clone() } } @@ -230,7 +229,7 @@ impl CrudRequest for CreateRequest { record.mapping = self.mapping; } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.model_schema.clone() } } diff --git a/api/src/endpoints/connection_oauth_definition.rs b/api/src/endpoints/connection_oauth_definition.rs index fce4a4e0..a7db6aa2 100644 --- a/api/src/endpoints/connection_oauth_definition.rs +++ b/api/src/endpoints/connection_oauth_definition.rs @@ -1,4 +1,4 @@ -use super::{create, delete, read, update, CachedRequest, CrudHook, CrudRequest, GetCache}; +use super::{create, delete, read, update, CachedRequest, CrudHook, CrudRequest, ReadResponse}; use crate::server::{AppState, AppStores}; use axum::{ routing::{patch, post}, @@ -6,20 +6,21 @@ use axum::{ }; use chrono::Utc; use integrationos_domain::{ + algebra::MongoStore, common::{ api_model_config::{ApiModelConfig, Compute, Function, Lang}, connection_oauth_definition::{ ComputeRequest, ConnectionOAuthDefinition, Frontend, OAuthApiConfig, OAuthCompute, }, event_access::EventAccess, - mongo::MongoDbStore, }, id::{prefix::IdPrefix, Id}, record_metadata::RecordMetadata, }; +use moka::future::Cache; use mongodb::bson::doc; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; pub fn get_router() -> Router> { Router::new() @@ -166,7 +167,7 @@ impl CrudRequest for CreateRequest { record.record_metadata.updated = true; } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.oauth_config.clone() } } @@ -198,15 +199,15 @@ impl CrudRequest for FrontendOauthConnectionDefinition { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.frontend_oauth_config.clone() } } impl CachedRequest for FrontendOauthConnectionDefinition { - type Output = FrontendOauthConnectionDefinition; - - fn get_cache(state: Arc) -> GetCache { + fn get_cache( + state: Arc, + ) -> Arc>, Arc>>> { state.connection_oauth_definitions_cache.clone() } } diff --git a/api/src/endpoints/event_access.rs b/api/src/endpoints/event_access.rs index 906f0a50..fef274b1 100644 --- a/api/src/endpoints/event_access.rs +++ b/api/src/endpoints/event_access.rs @@ -1,5 +1,11 @@ -use std::sync::Arc; - +use super::{delete, read, CrudRequest}; +use crate::{ + api_payloads::ErrorResponse, + bad_request, + config::Config, + internal_server_error, + server::{AppState, AppStores}, +}; use anyhow::Result; use axum::{ extract::State, @@ -8,7 +14,7 @@ use axum::{ Extension, Json, Router, }; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ access_key_data::AccessKeyData, access_key_prefix::AccessKeyPrefix, @@ -16,7 +22,6 @@ use integrationos_domain::{ environment::Environment, event_access::EventAccess, event_type::EventType, - mongo::MongoDbStore, ownership::Ownership, record_metadata::RecordMetadata, AccessKey, @@ -26,20 +31,10 @@ use integrationos_domain::{ use mongodb::bson::doc; use rand::Rng; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tracing::{error, warn}; - use validator::Validate; -use crate::{ - api_payloads::ErrorResponse, - bad_request, - config::Config, - internal_server_error, - server::{AppState, AppStores}, -}; - -use super::{delete, read, CrudRequest}; - const DEFAULT_GROUP: &str = "event-inc-internal"; const DEFAULT_NAMESPACE: &str = "default"; @@ -76,7 +71,7 @@ impl CrudRequest for CreateEventAccessRequest { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.event_access } diff --git a/api/src/endpoints/events.rs b/api/src/endpoints/events.rs index 59077b03..6d3c8213 100644 --- a/api/src/endpoints/events.rs +++ b/api/src/endpoints/events.rs @@ -1,13 +1,13 @@ -use std::sync::Arc; - +use super::{read, CrudRequest}; +use crate::server::{AppState, AppStores}; use axum::{routing::get, Router}; use bson::doc; -use integrationos_domain::common::{event_access::EventAccess, mongo::MongoDbStore, Event}; +use integrationos_domain::{ + algebra::MongoStore, + common::{event_access::EventAccess, Event}, +}; use serde::{Deserialize, Serialize}; - -use crate::server::{AppState, AppStores}; - -use super::{read, CrudRequest}; +use std::sync::Arc; pub fn get_router() -> Router> { Router::new().route("/", get(read::)) @@ -28,7 +28,7 @@ impl CrudRequest for CreateEventRequest { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.event } diff --git a/api/src/endpoints/mod.rs b/api/src/endpoints/mod.rs index 090fe216..ff09aa28 100644 --- a/api/src/endpoints/mod.rs +++ b/api/src/endpoints/mod.rs @@ -1,23 +1,6 @@ -pub mod common_enum; -pub mod common_model; -pub mod connection; -pub mod connection_definition; -pub mod connection_model_definition; -pub mod connection_model_schema; -pub mod connection_oauth_definition; -pub mod event_access; -pub mod events; -pub mod metrics; -pub mod oauth; -pub mod openapi; -pub mod passthrough; -pub mod pipeline; -pub mod transactions; -pub mod unified; - use crate::{ api_payloads::ErrorResponse, - bad_request, internal_server_error, not_found, + internal_server_error, not_found, server::{AppState, AppStores}, util::shape_mongo_filter, }; @@ -30,9 +13,9 @@ use axum::{ use bson::{doc, SerializerOptions}; use http::{HeaderMap, HeaderValue, StatusCode}; use integrationos_domain::{ - algebra::adapter::StoreAdapter, - common::{event_access::EventAccess, mongo::MongoDbStore, Connection}, - IntegrationOSError, OAuth, Store, + algebra::{MongoStore, StoreExt}, + common::{event_access::EventAccess, Connection}, + ApplicationError, IntegrationOSError, InternalError, OAuth, Store, }; use moka::future::Cache; use mongodb::options::FindOneOptions; @@ -41,11 +24,24 @@ use std::{collections::BTreeMap, sync::Arc}; use tokio::try_join; use tracing::error; -const INTEGRATION_OS_PASSTHROUGH_HEADER: &str = "x-integrationos-passthrough"; +pub mod common_enum; +pub mod common_model; +pub mod connection; +pub mod connection_definition; +pub mod connection_model_definition; +pub mod connection_model_schema; +pub mod connection_oauth_definition; +pub mod event_access; +pub mod events; +pub mod metrics; +pub mod oauth; +pub mod openapi; +pub mod passthrough; +pub mod pipeline; +pub mod transactions; +pub mod unified; -pub type GetCache = Arc>, Arc>>>; -pub type ApiError = (StatusCode, Json); -pub type ApiResult = Result, ApiError>; +const INTEGRATION_OS_PASSTHROUGH_HEADER: &str = "x-integrationos-passthrough"; pub trait CrudRequest: Sized { type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static; @@ -54,15 +50,18 @@ pub trait CrudRequest: Sized { fn into_with_event_access(self, event_access: Arc) -> Self::Output; fn into_public(self) -> Result; fn update(self, record: &mut Self::Output); - fn get_store(stores: AppStores) -> MongoDbStore; + fn get_store(stores: AppStores) -> MongoStore; } -pub trait CachedRequest: Sized { - type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static; - - fn get_cache(state: Arc) -> GetCache; +pub trait CachedRequest: CrudRequest { + fn get_cache( + state: Arc, + ) -> Arc>, Arc>>>; } +pub type ApiError = (StatusCode, Json); +pub type ApiResult = Result, ApiError>; + #[async_trait] pub trait CrudHook where @@ -194,7 +193,7 @@ pub async fn read_cached( State(state): State>, ) -> Result>>, ApiError> where - T: CrudRequest + CachedRequest + 'static, + T: CachedRequest + 'static, U: Clone + Serialize + DeserializeOwned + Unpin + Sync + Send + 'static, { let cache = T::get_cache(state.clone()); @@ -357,19 +356,21 @@ where struct SparseConnection { oauth: OAuth, } - async fn get_connection( access: &EventAccess, connection_id: &HeaderValue, stores: &AppStores, cache: &Cache<(Arc, HeaderValue), Arc>, -) -> Result, ApiError> { +) -> Result, IntegrationOSError> { let connection = cache .try_get_with( (access.ownership.id.clone(), connection_id.clone()), async { let Ok(connection_id_str) = connection_id.to_str() else { - return Err(bad_request!("Invalid connection key header")); + return Err(ApplicationError::bad_request( + "Invalid connection key header", + None, + )); }; let connection = match stores @@ -383,12 +384,12 @@ async fn get_connection( { Ok(Some(data)) => Arc::new(data), Ok(None) => { - return Err(not_found!("Connection")); + return Err(ApplicationError::not_found("Connection", None)); } Err(e) => { error!("Error fetching connection: {:?}", e); - return Err(internal_server_error!()); + return Err(InternalError::unknown("Error fetching connection", None)); } }; @@ -418,12 +419,12 @@ async fn get_connection( { Ok(Some(data)) => data, Ok(None) => { - return Err(not_found!("Connection")); + return Err(ApplicationError::not_found("Connection", None)); } Err(e) => { error!("Error fetching connection: {:?}", e); - return Err(internal_server_error!()); + return Err(InternalError::unknown("Error fetching connection", None)); } }; let mut connection = (*connection).clone(); diff --git a/api/src/endpoints/oauth.rs b/api/src/endpoints/oauth.rs index 66415d38..d5234835 100644 --- a/api/src/endpoints/oauth.rs +++ b/api/src/endpoints/oauth.rs @@ -1,6 +1,5 @@ use super::event_access::CreateEventAccessPayloadWithOwnership; use crate::{endpoints::ApiError, internal_server_error, not_found, server::AppState}; -use anyhow::anyhow; use axum::{ extract::{Path, State}, routing::post, @@ -9,7 +8,7 @@ use axum::{ use chrono::{Duration, Utc}; use http::{HeaderMap, HeaderName, HeaderValue}; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, StoreExt}, common::{ api_model_config::ContentType, connection_definition::ConnectionDefinition, @@ -17,7 +16,6 @@ use integrationos_domain::{ Computation, ConnectionOAuthDefinition, OAuthResponse, PlatformSecret, Settings, }, event_access::EventAccess, - mongo::MongoDbStore, ownership::Ownership, Connection, OAuth, Throughput, }, @@ -198,11 +196,7 @@ async fn oauth_handler( expires_at: Some( (chrono::Utc::now() + Duration::try_seconds(oauth_secret.expires_in as i64) - .ok_or(anyhow!("Invalid expires_in timestamp")) - .map_err(|e| { - error!("Failed to decode oauth response: {}", e); - internal_server_error!() - })?) + .unwrap_or(Duration::zero())) .timestamp(), ), }), @@ -413,7 +407,7 @@ async fn find_connection_definition( state: &State>, connection_definition_id: &Id, ) -> ApiResult { - let connection_definition_store: &MongoDbStore = + let connection_definition_store: &MongoStore = &state.app_stores.connection_config; let connection_definition: ConnectionDefinition = connection_definition_store @@ -432,7 +426,7 @@ async fn find_oauth_definition( state: &State>, platform: &str, ) -> ApiResult { - let oauth_definition_store: &MongoDbStore = + let oauth_definition_store: &MongoStore = &state.app_stores.oauth_config; let oauth_definition: ConnectionOAuthDefinition = oauth_definition_store @@ -452,7 +446,7 @@ async fn find_settings( ownership: &Ownership, is_admin: bool, ) -> ApiResult { - let settings_store: &MongoDbStore = &state.app_stores.settings; + let settings_store: &MongoStore = &state.app_stores.settings; let ownership_id = if !is_admin { ownership.id.to_string() diff --git a/api/src/endpoints/openapi.rs b/api/src/endpoints/openapi.rs index c54f8b7f..006af3f2 100644 --- a/api/src/endpoints/openapi.rs +++ b/api/src/endpoints/openapi.rs @@ -11,9 +11,8 @@ use futures::{Stream, StreamExt, TryStreamExt}; use http::StatusCode; use indexmap::IndexMap; use integrationos_domain::{ - algebra::{adapter::StoreAdapter, measured::TimedExt}, + algebra::{MongoStore, StoreExt, TimedExt}, common_model::{CommonEnum, CommonModel}, - mongo::MongoDbStore, }; use mongodb::error::Error as MongoError; use openapiv3::*; @@ -51,8 +50,8 @@ impl OpenAPIData { pub fn spawn_openapi_generation( &self, - cm_store: MongoDbStore, - ce_store: MongoDbStore, + cm_store: MongoStore, + ce_store: MongoStore, ) -> JoinHandle> { spawn_openapi_generation(cm_store, ce_store, self.clone()) } @@ -168,8 +167,8 @@ pub async fn get_openapi( } fn spawn_openapi_generation( - cm_store: MongoDbStore, - ce_store: MongoDbStore, + cm_store: MongoStore, + ce_store: MongoStore, state: OpenAPIData, ) -> JoinHandle> { tokio::spawn(async move { @@ -279,8 +278,8 @@ fn spawn_openapi_generation( async fn generate_references_data( cm: CommonModel, - cm_store: MongoDbStore, - ce_store: MongoDbStore, + cm_store: MongoStore, + ce_store: MongoStore, ) -> Result { let mut schema = IndexMap::new(); let (child_cms, missing) = cm diff --git a/api/src/endpoints/passthrough.rs b/api/src/endpoints/passthrough.rs index 76d45a9d..68955fd9 100644 --- a/api/src/endpoints/passthrough.rs +++ b/api/src/endpoints/passthrough.rs @@ -1,5 +1,5 @@ -use std::{collections::HashMap, sync::Arc}; - +use super::{get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER}; +use crate::{metrics::Metric, server::AppState}; use axum::{ extract::{Query, State}, response::IntoResponse, @@ -8,16 +8,16 @@ use axum::{ }; use http::{header::CONTENT_LENGTH, HeaderMap, HeaderName, Method, Uri}; use hyper::body::Bytes; -use integrationos_domain::common::{ - destination::{Action, Destination}, - event_access::EventAccess, +use integrationos_domain::{ + common::{ + destination::{Action, Destination}, + event_access::EventAccess, + }, + ApplicationError, InternalError, }; +use std::{collections::HashMap, sync::Arc}; use tracing::error; -use crate::{bad_request, metrics::Metric, server::AppState, service_unavailable}; - -use super::{get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER}; - pub fn get_router() -> Router> { Router::new().route( "/*key", @@ -38,7 +38,10 @@ pub async fn passthrough_request( body: Bytes, ) -> impl IntoResponse { let Some(connection_key_header) = headers.get(&state.config.headers.connection_header) else { - return Err(bad_request!("Missing connection key header")); + return Err(ApplicationError::bad_request( + "Connection header not found", + None, + )); }; let connection = get_connection( @@ -79,7 +82,7 @@ pub async fn passthrough_request( e ); - service_unavailable!() + e })?; let mut headers = HeaderMap::new(); @@ -113,7 +116,7 @@ pub async fn passthrough_request( e ); - service_unavailable!() + InternalError::script_error("Error retrieving bytes from response", None) })?; Ok((status, headers, bytes)) diff --git a/api/src/endpoints/pipeline.rs b/api/src/endpoints/pipeline.rs index bf5c4f6f..11aad767 100644 --- a/api/src/endpoints/pipeline.rs +++ b/api/src/endpoints/pipeline.rs @@ -3,10 +3,11 @@ use crate::server::{AppState, AppStores}; use axum::{routing::post, Router}; use bson::doc; use integrationos_domain::{ + algebra::MongoStore, common::{ configuration::pipeline::PipelineConfig, destination::Destination, - event_access::EventAccess, middleware::Middleware, mongo::MongoDbStore, - record_metadata::RecordMetadata, signature::Signature, source::Source, Pipeline, + event_access::EventAccess, middleware::Middleware, record_metadata::RecordMetadata, + signature::Signature, source::Source, Pipeline, }, id::{prefix::IdPrefix, Id}, }; @@ -86,7 +87,7 @@ impl CrudRequest for CreatePipelineRequest { record.record_metadata.mark_updated(&record.ownership.id); } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.pipeline } } diff --git a/api/src/endpoints/transactions.rs b/api/src/endpoints/transactions.rs index 5b6e47ab..f7978a5b 100644 --- a/api/src/endpoints/transactions.rs +++ b/api/src/endpoints/transactions.rs @@ -1,13 +1,13 @@ -use std::sync::Arc; - +use super::{read, CrudRequest}; +use crate::server::{AppState, AppStores}; use axum::{routing::get, Router}; use bson::doc; -use integrationos_domain::common::{event_access::EventAccess, mongo::MongoDbStore, Transaction}; +use integrationos_domain::{ + algebra::MongoStore, + common::{event_access::EventAccess, Transaction}, +}; use serde::{Deserialize, Serialize}; - -use crate::server::{AppState, AppStores}; - -use super::{read, CrudRequest}; +use std::sync::Arc; pub fn get_router() -> Router> { Router::new().route("/", get(read::)) @@ -32,7 +32,7 @@ impl CrudRequest for TransactionCrud { unimplemented!() } - fn get_store(stores: AppStores) -> MongoDbStore { + fn get_store(stores: AppStores) -> MongoStore { stores.transactions } } diff --git a/api/src/endpoints/unified.rs b/api/src/endpoints/unified.rs index b19ce7dc..5ff93402 100644 --- a/api/src/endpoints/unified.rs +++ b/api/src/endpoints/unified.rs @@ -1,5 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; - +use crate::{config::Headers, metrics::Metric, server::AppState}; use axum::{ extract::{Path, Query, State}, response::{IntoResponse, Response}, @@ -15,17 +14,13 @@ use integrationos_domain::{ encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, event_access::EventAccess, AccessKey, Event, }, - ApplicationError, IntegrationOSError, + ApplicationError, InternalError, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use std::{collections::HashMap, sync::Arc}; use tracing::error; -use crate::{ - bad_request, config::Headers, debug_error, internal_server_error, metrics::Metric, - not_found_with_custom_message, server::AppState, service_unavailable, -}; - use super::{get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER}; pub fn get_router() -> Router> { @@ -187,7 +182,10 @@ pub async fn process_request( body: Option, ) -> impl IntoResponse { let Some(connection_key_header) = headers.get(&state.config.headers.connection_header) else { - return Err(bad_request!("Missing connection key header")); + return Err(ApplicationError::bad_request( + "Missing connection key header", + None, + )); }; let connection = get_connection( user_event_access.as_ref(), @@ -195,7 +193,11 @@ pub async fn process_request( &state.app_stores, &state.connections_cache, ) - .await?; + .await + .map_err(|e| { + error!("Error getting connection: {:?}", e); + e + })?; let Query(query_params) = query_params.unwrap_or_default(); @@ -215,7 +217,7 @@ pub async fn process_request( .. } = &action else { - return Err(internal_server_error!()); + return Err(ApplicationError::bad_request("Invalid action", None)); }; let event_name = format!( "{}::{}::{}::{}", @@ -238,18 +240,7 @@ pub async fn process_request( "Error executing connection model definition in unified endpoint: {:?}", e ); - if state.config.debug_mode { - return debug_error!(format!("{e:?}")); - } - match e { - IntegrationOSError::Internal(_) => service_unavailable!(), - IntegrationOSError::Application(e) => match e { - ApplicationError::NotFound { .. } => { - not_found_with_custom_message!("The requested resource was not found") - } - _ => internal_server_error!(), - }, - } + e })?; *response.headers_mut() = response @@ -276,12 +267,15 @@ pub async fn process_request( .try_into() .map_err(|e| { error!("event_access_password is not 32 bytes in length: {e}"); - internal_server_error!() + InternalError::decryption_error( + "event_access_password is not 32 bytes in length", + None, + ) })?; let access_key = AccessKey::parse(&encrypted_access_key, &password).map_err(|e| { error!("Could not decrypt access key: {e}"); - internal_server_error!() + InternalError::decryption_error("Could not decrypt access key", None) })?; const META: &str = "meta"; let body = serde_json::to_string(&json!({ @@ -289,7 +283,7 @@ pub async fn process_request( })) .map_err(|e| { error!("Could not serialize meta body to string: {e}"); - internal_server_error!() + InternalError::invalid_argument("Could not serialize meta body to string", None) })?; let name = if parts.status.is_success() { diff --git a/api/src/middleware/auth.rs b/api/src/middleware/auth.rs index aa2a7673..dc8c4f8e 100644 --- a/api/src/middleware/auth.rs +++ b/api/src/middleware/auth.rs @@ -3,7 +3,7 @@ use crate::{ }; use axum::{extract::State, middleware::Next, response::Response}; use http::Request; -use integrationos_domain::{algebra::adapter::StoreAdapter, ApplicationError, InternalError}; +use integrationos_domain::{algebra::StoreExt, ApplicationError, InternalError}; use mongodb::bson::doc; use std::sync::Arc; use tracing::error; @@ -43,7 +43,7 @@ pub async fn auth( "deleted": false }) .await - .map_err(|e| InternalError::connection_error(&e.to_string(), None))?; + .map_err(|e| InternalError::connection_error(e.as_ref(), None))?; if let Some(event_access) = event_access { Ok(Arc::new(event_access)) diff --git a/api/src/middleware/rate_limiter.rs b/api/src/middleware/rate_limiter.rs index 3b096c25..5606a026 100644 --- a/api/src/middleware/rate_limiter.rs +++ b/api/src/middleware/rate_limiter.rs @@ -7,8 +7,8 @@ use axum::{ Extension, }; use http::{HeaderName, Request}; -use integrationos_domain::event_access::EventAccess; -use redis_retry::{AsyncCommands, Redis}; +use integrationos_domain::{algebra::RedisCache, event_access::EventAccess}; +use redis::AsyncCommands; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tracing::warn; @@ -25,7 +25,7 @@ pub struct RateLimiter { impl RateLimiter { pub async fn new(state: Arc) -> Result { - let mut redis = Redis::new_with_retry_count(&state.config.redis_config, 0) + let mut redis = RedisCache::new(&state.config.redis_config, 0) .await .with_context(|| "Could not connect to redis")?; diff --git a/api/src/server.rs b/api/src/server.rs index 0ad11ef8..045d076a 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -2,7 +2,7 @@ use crate::{ config::Config, endpoints::{ connection_oauth_definition::FrontendOauthConnectionDefinition, openapi::OpenAPIData, - GetCache, + ReadResponse, }, metrics::Metric, routes, @@ -11,7 +11,7 @@ use anyhow::{anyhow, Context, Result}; use axum::Router; use http::HeaderValue; use integrationos_domain::{ - algebra::crypto::Crypto, + algebra::{CryptoExt, MongoStore}, common::{ common_model::CommonModel, connection_definition::ConnectionDefinition, @@ -20,7 +20,6 @@ use integrationos_domain::{ connection_oauth_definition::{ConnectionOAuthDefinition, Settings}, cursor::Cursor, event_access::EventAccess, - mongo::MongoDbStore, stage::Stage, Connection, Event, Pipeline, Store, Transaction, }, @@ -31,30 +30,30 @@ use integrationos_domain::{ use moka::future::Cache; use mongodb::{options::UpdateOptions, Client, Database}; use segment::{AutoBatcher, Batcher, HttpClient}; -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio::{sync::mpsc::Sender, time::timeout, try_join}; use tracing::{error, info, trace, warn}; #[derive(Clone)] pub struct AppStores { pub db: Database, - pub model_config: MongoDbStore, - pub oauth_config: MongoDbStore, - pub frontend_oauth_config: MongoDbStore, - pub model_schema: MongoDbStore, - pub public_model_schema: MongoDbStore, - pub common_model: MongoDbStore, - pub common_enum: MongoDbStore, - pub connection: MongoDbStore, - pub public_connection_details: MongoDbStore, - pub settings: MongoDbStore, - pub connection_config: MongoDbStore, - pub pipeline: MongoDbStore, - pub event_access: MongoDbStore, - pub event: MongoDbStore, - pub transactions: MongoDbStore, - pub cursors: MongoDbStore, - pub stages: MongoDbStore, + pub model_config: MongoStore, + pub oauth_config: MongoStore, + pub frontend_oauth_config: MongoStore, + pub model_schema: MongoStore, + pub public_model_schema: MongoStore, + pub common_model: MongoStore, + pub common_enum: MongoStore, + pub connection: MongoStore, + pub public_connection_details: MongoStore, + pub settings: MongoStore, + pub connection_config: MongoStore, + pub pipeline: MongoStore, + pub event_access: MongoStore, + pub event: MongoStore, + pub transactions: MongoStore, + pub cursors: MongoStore, + pub stages: MongoStore, } #[derive(Clone)] @@ -65,9 +64,15 @@ pub struct AppState { pub openapi_data: OpenAPIData, pub http_client: reqwest::Client, pub connections_cache: Cache<(Arc, HeaderValue), Arc>, - pub connection_definitions_cache: GetCache, - pub connection_oauth_definitions_cache: GetCache, - pub secrets_client: Arc, + pub connection_definitions_cache: + Arc>, Arc>>>, + pub connection_oauth_definitions_cache: Arc< + Cache< + Option>, + Arc>, + >, + >, + pub secrets_client: Arc, pub extractor_caller: UnifiedDestination, pub event_tx: Sender, pub metric_tx: Sender, @@ -81,7 +86,7 @@ pub struct Server { impl Server { pub async fn init( config: Config, - secrets_client: Arc, + secrets_client: Arc, ) -> Result { let client = Client::with_uri_str(&config.db_config.control_db_url).await?; let db = client.database(&config.db_config.control_db_name); @@ -89,30 +94,26 @@ impl Server { let http_client = reqwest::ClientBuilder::new() .timeout(Duration::from_secs(config.http_client_timeout_secs)) .build()?; - let model_config = - MongoDbStore::new_with_db(db.clone(), Store::ConnectionModelDefinitions).await?; - let oauth_config = - MongoDbStore::new_with_db(db.clone(), Store::ConnectionOAuthDefinitions).await?; + let model_config = MongoStore::new(&db, &Store::ConnectionModelDefinitions).await?; + let oauth_config = MongoStore::new(&db, &Store::ConnectionOAuthDefinitions).await?; let frontend_oauth_config = - MongoDbStore::new_with_db(db.clone(), Store::ConnectionOAuthDefinitions).await?; - let model_schema = - MongoDbStore::new_with_db(db.clone(), Store::ConnectionModelSchemas).await?; + MongoStore::new(&db, &Store::ConnectionOAuthDefinitions).await?; + let model_schema = MongoStore::new(&db, &Store::ConnectionModelSchemas).await?; let public_model_schema = - MongoDbStore::new_with_db(db.clone(), Store::PublicConnectionModelSchemas).await?; - let common_model = MongoDbStore::new_with_db(db.clone(), Store::CommonModels).await?; - let common_enum = MongoDbStore::new_with_db(db.clone(), Store::CommonEnums).await?; - let connection = MongoDbStore::new_with_db(db.clone(), Store::Connections).await?; + MongoStore::new(&db, &Store::PublicConnectionModelSchemas).await?; + let common_model = MongoStore::new(&db, &Store::CommonModels).await?; + let common_enum = MongoStore::new(&db, &Store::CommonEnums).await?; + let connection = MongoStore::new(&db, &Store::Connections).await?; let public_connection_details = - MongoDbStore::new_with_db(db.clone(), Store::PublicConnectionDetails).await?; - let settings = MongoDbStore::new_with_db(db.clone(), Store::Settings).await?; - let connection_config = - MongoDbStore::new_with_db(db.clone(), Store::ConnectionDefinitions).await?; - let pipeline = MongoDbStore::new_with_db(db.clone(), Store::Pipelines).await?; - let event_access = MongoDbStore::new_with_db(db.clone(), Store::EventAccess).await?; - let event = MongoDbStore::new_with_db(db.clone(), Store::Events).await?; - let transactions = MongoDbStore::new_with_db(db.clone(), Store::Transactions).await?; - let cursors = MongoDbStore::new_with_db(db.clone(), Store::Cursors).await?; - let stages = MongoDbStore::new_with_db(db.clone(), Store::Stages).await?; + MongoStore::new(&db, &Store::PublicConnectionDetails).await?; + let settings = MongoStore::new(&db, &Store::Settings).await?; + let connection_config = MongoStore::new(&db, &Store::ConnectionDefinitions).await?; + let pipeline = MongoStore::new(&db, &Store::Pipelines).await?; + let event_access = MongoStore::new(&db, &Store::EventAccess).await?; + let event = MongoStore::new(&db, &Store::Events).await?; + let transactions = MongoStore::new(&db, &Store::Transactions).await?; + let cursors = MongoStore::new(&db, &Store::Cursors).await?; + let stages = MongoStore::new(&db, &Store::Stages).await?; let extractor_caller = UnifiedDestination::new( config.db_config.clone(), diff --git a/api/tests/api_tests/test_server/mod.rs b/api/tests/api_tests/test_server/mod.rs index cf9d85ef..04cdf227 100644 --- a/api/tests/api_tests/test_server/mod.rs +++ b/api/tests/api_tests/test_server/mod.rs @@ -1,9 +1,3 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::{Arc, OnceLock, RwLock}, - time::Duration, -}; - use anyhow::Result; use api::{ config::Config, @@ -19,7 +13,7 @@ use fake::{Fake, Faker}; use http::StatusCode; use http::{header::AUTHORIZATION, Method}; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{CryptoExt, MongoStore, StoreExt}, common::{ access_key_data::AccessKeyData, access_key_prefix::AccessKeyPrefix, @@ -31,12 +25,10 @@ use integrationos_domain::{ environment::Environment, event_access::EventAccess, event_type::EventType, - mongo::MongoDbStore, AccessKey, Connection, Store, }, create_secret_response::{CreateSecretAuthor, CreateSecretResponse}, get_secret_request::GetSecretRequest, - prelude::crypto::Crypto, IntegrationOSError, }; use mockito::{Matcher, Server as MockServer, ServerGuard}; @@ -45,6 +37,11 @@ use rand::Rng; use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use serde_json::{from_value, to_value}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::{Arc, OnceLock, RwLock}, + time::Duration, +}; use testcontainers_modules::{ mongo::Mongo, redis::Redis, @@ -99,7 +96,7 @@ pub struct MockSecretsClient { } #[async_trait] -impl Crypto for MockSecretsClient { +impl CryptoExt for MockSecretsClient { async fn encrypt( &self, key: String, @@ -240,9 +237,8 @@ impl TestServer { let db = Client::with_uri_str(&db).await.unwrap().database(&db_name); - let store: MongoDbStore = MongoDbStore::new_with_db(db, Store::EventAccess) - .await - .unwrap(); + let store: MongoStore = + MongoStore::new(&db, &Store::EventAccess).await.unwrap(); store .create_many(&[live.clone(), test.clone()]) diff --git a/event-core/Cargo.toml b/event-core/Cargo.toml index 9b01ce15..c33c14eb 100644 --- a/event-core/Cargo.toml +++ b/event-core/Cargo.toml @@ -12,7 +12,6 @@ chrono.workspace = true dotenvy.workspace = true envconfig.workspace = true futures.workspace = true -google-token-fetcher = { path = "../google-token-fetcher" } handlebars.workspace = true http.workspace = true integrationos-domain = { workspace = true, features = ["unified"] } @@ -21,7 +20,7 @@ metrics = "0.21.1" metrics-exporter-prometheus = "0.12.1" moka.workspace = true mongodb.workspace = true -redis-retry = { path = "../redis-retry" } +redis.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/event-core/src/config.rs b/event-core/src/config.rs index 7b467b17..0a2ac240 100644 --- a/event-core/src/config.rs +++ b/event-core/src/config.rs @@ -1,6 +1,7 @@ use envconfig::Envconfig; -use integrationos_domain::common::{database::DatabaseConfig, secrets::SecretsConfig}; -use redis_retry::Config as RedisConfig; +use integrationos_domain::common::{ + cache::CacheConfig as RedisConfig, database::DatabaseConfig, secrets::SecretsConfig, +}; use std::fmt::{Display, Formatter}; #[derive(Envconfig, Clone)] // Intentionally no Debug so secret is not printed diff --git a/event-core/src/dispatcher.rs b/event-core/src/dispatcher.rs index cfc22f6f..81ba96b1 100644 --- a/event-core/src/dispatcher.rs +++ b/event-core/src/dispatcher.rs @@ -9,12 +9,14 @@ use futures::{ FutureExt, }; use integrationos_domain::{ - algebra::execution::{ExecutionContext, Status}, + algebra::{PipelineExt, PipelineStatus}, common::{ - extractor_context::Stage as ExtractorStage, middleware::Middleware, - pipeline_context::Stage as PipelineStage, root_context::Stage as RootStage, Event, - ExtractorContext, PipelineContext, RootContext, Transaction, + extractor_context::Stage as ExtractorStage, middleware::Middleware, ExtractorContext, + PipelineContext, RootContext, Transaction, }, + pipeline_context::PipelineStage, + root_context::RootStage, + Event, }; use js_sandbox_ios::Script; use serde_json::{json, Value}; @@ -138,7 +140,7 @@ where context.stage = RootStage::Verified; } else { warn!("Event did not verify, dropped"); - context.status = Status::Dropped { + context.status = PipelineStatus::Dropped { reason: "Did not verify".to_owned(), }; } @@ -369,7 +371,7 @@ where } } } - context.status = Status::Dropped { + context.status = PipelineStatus::Dropped { reason: "Failed destination".to_string(), }; warn!("Failed destination"); @@ -459,7 +461,7 @@ where } } - context.status = Status::Dropped { + context.status = PipelineStatus::Dropped { reason: "Failed extractor".to_string(), }; self.context_store.set(context.clone()).await?; diff --git a/event-core/src/event_handler.rs b/event-core/src/event_handler.rs index 7884c889..a8f50954 100644 --- a/event-core/src/event_handler.rs +++ b/event-core/src/event_handler.rs @@ -1,7 +1,11 @@ use crate::store::{ContextStore, ControlDataStore}; use anyhow::{Context, Result}; -use integrationos_domain::common::{event_with_context::EventWithContext, Event, Transaction}; -use redis_retry::{AsyncCommands, Config, Redis}; +use integrationos_domain::{ + algebra::RedisCache, + cache::CacheConfig, + common::{event_with_context::EventWithContext, Event, Transaction}, +}; +use redis::AsyncCommands; use std::{sync::Arc, time::Duration}; use tokio::{join, sync::Mutex, time::sleep}; use tracing::error; @@ -11,8 +15,8 @@ pub struct EventHandler< T: ControlDataStore + Sync + Send + 'static, U: ContextStore + Sync + Send + 'static, > { - config: Config, - redis: Arc>, + config: CacheConfig, + redis: Arc>, control_store: Arc, context_store: Arc, } @@ -20,8 +24,12 @@ pub struct EventHandler< impl EventHandler { - pub async fn new(config: Config, control_store: Arc, context_store: Arc) -> Result { - let redis = Arc::new(Mutex::new(Redis::new(&config).await?)); + pub async fn new( + config: CacheConfig, + control_store: Arc, + context_store: Arc, + ) -> Result { + let redis = Arc::new(Mutex::new(RedisCache::new(&config, 100).await?)); Ok(Self { config, diff --git a/event-core/src/mocks/mock_context_store.rs b/event-core/src/mocks/mock_context_store.rs index ac5f38c1..aa0213dc 100644 --- a/event-core/src/mocks/mock_context_store.rs +++ b/event-core/src/mocks/mock_context_store.rs @@ -1,14 +1,14 @@ use crate::store::ContextStore; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use integrationos_domain::{algebra::execution::ExecutionContext, id::Id}; +use integrationos_domain::{algebra::PipelineExt, id::Id}; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, sync::{Arc, Mutex}, }; -type Contexts = Arc>>>>; +type Contexts = Arc>>>>; #[derive(Clone, Default)] pub struct MockContextStorage { @@ -25,7 +25,7 @@ impl MockContextStorage { #[async_trait] impl ContextStore for MockContextStorage { - async fn get Deserialize<'a> + Unpin>( + async fn get Deserialize<'a> + Unpin>( &self, context_key: &Id, ) -> Result { @@ -37,13 +37,13 @@ impl ContextStore for MockContextStorage { let last = c.last(); last.expect("No context for {context_key}") .downcast_ref::() - .expect("ExecutionContext could not be downcast") + .expect("PipelineExt could not be downcast") .clone() }) .ok_or(anyhow!("No context for {context_key}")) } - async fn set(&self, context: T) -> Result<()> { + async fn set(&self, context: T) -> Result<()> { let context = Box::new(context); self.contexts .lock() diff --git a/event-core/src/mocks/mock_secret_service.rs b/event-core/src/mocks/mock_secret_service.rs index 308c27ba..8f27ef6c 100644 --- a/event-core/src/mocks/mock_secret_service.rs +++ b/event-core/src/mocks/mock_secret_service.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use integrationos_domain::{ - algebra::crypto::Crypto, create_secret_response::CreateSecretResponse, + algebra::CryptoExt, create_secret_response::CreateSecretResponse, get_secret_request::GetSecretRequest, IntegrationOSError, }; @@ -9,7 +9,7 @@ use integrationos_domain::{ pub struct MockSecretsClient; #[async_trait] -impl Crypto for MockSecretsClient { +impl CryptoExt for MockSecretsClient { async fn decrypt( &self, _secret: &GetSecretRequest, diff --git a/event-core/src/mongo_context_store.rs b/event-core/src/mongo_context_store.rs index 445db928..d574c2bc 100644 --- a/event-core/src/mongo_context_store.rs +++ b/event-core/src/mongo_context_store.rs @@ -2,7 +2,7 @@ use crate::{config::EventCoreConfig, store::ContextStore}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use bson::doc; -use integrationos_domain::{algebra::execution::ExecutionContext, id::Id}; +use integrationos_domain::{algebra::PipelineExt, id::Id}; use mongodb::{Client, Database}; use serde::{Deserialize, Serialize}; use std::time::Instant; @@ -26,7 +26,7 @@ impl MongoContextStore { #[async_trait] impl ContextStore for MongoContextStore { - async fn get Deserialize<'a> + Unpin>( + async fn get Deserialize<'a> + Unpin>( &self, context_key: &Id, ) -> Result { @@ -37,11 +37,11 @@ impl ContextStore for MongoContextStore { Ok(context.ok_or_else(|| anyhow!("No context found"))?) } - async fn set(&self, context: T) -> Result<()> { + async fn set(&self, context: T) -> Result<()> { let instant = Instant::now(); let coll = self.db.collection(&self.collection_name); if let Err(e) = coll.insert_one(context, None).await { - error!("ExecutionContext insertion error {e}"); + error!("PipelineExt insertion error {e}"); } trace!( "Wrote context in {}", diff --git a/event-core/src/mongo_control_data_store.rs b/event-core/src/mongo_control_data_store.rs index 1e04d7e8..c068c449 100644 --- a/event-core/src/mongo_control_data_store.rs +++ b/event-core/src/mongo_control_data_store.rs @@ -6,19 +6,14 @@ use anyhow::{bail, Context as AnyhowContext, Result}; use async_trait::async_trait; use bson::{doc, SerializerOptions}; use futures::future::join_all; -use google_token_fetcher::GoogleTokenFetcher; use handlebars::Handlebars; use http::header::AUTHORIZATION; use integrationos_domain::{ - algebra::{adapter::StoreAdapter, crypto::Crypto}, + algebra::{CryptoExt, FecherExt, GoogleTokenFetcher, MongoStore, StoreExt}, common::{ - duplicates::Duplicates, - encrypted_access_key::EncryptedAccessKey, - event_access::EventAccess, - extractor::HttpExtractor, - middleware::Middleware, - mongo::{MongoDbStore, MongoDbStoreConfig}, - Connection, Event, Pipeline, Store, + duplicates::Duplicates, encrypted_access_key::EncryptedAccessKey, + event_access::EventAccess, extractor::HttpExtractor, middleware::Middleware, Connection, + Event, Pipeline, Store, }, id::Id, service::unified_destination::UnifiedDestination, @@ -32,10 +27,10 @@ use tracing::{error, warn}; #[derive(Clone)] pub struct MongoControlDataStore { - pub connections_store: MongoDbStore, - pub event_store: MongoDbStore, - pub event_access_store: MongoDbStore, - pub pipelines_store: MongoDbStore, + pub connections_store: MongoStore, + pub event_store: MongoStore, + pub event_access_store: MongoStore, + pub pipelines_store: MongoStore, pub connections_cache: Cache, pub event_cache: Cache, pub event_access_cache: Cache, @@ -49,7 +44,7 @@ pub struct MongoControlDataStore { impl MongoControlDataStore { pub async fn new( config: &EventCoreConfig, - secrets_client: Arc, + secrets_client: Arc, ) -> Result { let mut client_options = ClientOptions::parse(&config.db.control_db_url) .await @@ -61,12 +56,9 @@ impl MongoControlDataStore { let db = client.database(&config.db.control_db_name); - let connections_store = - MongoDbStore::new(MongoDbStoreConfig::new(db.clone(), Store::Connections)).await?; - let event_access_store = - MongoDbStore::new(MongoDbStoreConfig::new(db.clone(), Store::EventAccess)).await?; - let pipelines_store = - MongoDbStore::new(MongoDbStoreConfig::new(db, Store::Pipelines)).await?; + let connections_store = MongoStore::new(&db.clone(), &Store::Connections).await?; + let event_access_store = MongoStore::new(&db.clone(), &Store::EventAccess).await?; + let pipelines_store = MongoStore::new(&db, &Store::Pipelines).await?; let mut event_client_options = ClientOptions::parse(&config.db.event_db_url) .await @@ -77,7 +69,7 @@ impl MongoControlDataStore { .with_context(|| "Failed to create events MongoDB client with options")?; let event_db = client.database(&config.db.event_db_name); - let event_store = MongoDbStore::new(MongoDbStoreConfig::new(event_db, Store::Events)) + let event_store = MongoStore::new(&event_db, &Store::Events) .await .with_context(|| { format!( @@ -124,7 +116,7 @@ impl MongoControlDataStore { } async fn fetch_google_auth_token(&self, url: &str) -> Option { - let token_fetcher = &(self.token_fetcher.clone()?); + let token_fetcher = &(self.clone().token_fetcher?); match token_fetcher.get_token(url).await { Ok(header) => Some(header), Err(_) => None, diff --git a/event-core/src/store.rs b/event-core/src/store.rs index 3cdbfb7e..703d201c 100644 --- a/event-core/src/store.rs +++ b/event-core/src/store.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use integrationos_domain::{ - algebra::execution::ExecutionContext, + algebra::PipelineExt, common::{duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, Pipeline}, id::Id, }; @@ -10,11 +10,11 @@ use serde_json::Value; #[async_trait] pub trait ContextStore { - async fn get Deserialize<'a> + Unpin>( + async fn get Deserialize<'a> + Unpin>( &self, context_key: &Id, ) -> Result; - async fn set(&self, context: T) -> Result<()>; + async fn set(&self, context: T) -> Result<()>; } #[async_trait] diff --git a/event-core/tests/mock_destination.rs b/event-core/tests/mock_destination.rs index d0d19f29..a7370c98 100644 --- a/event-core/tests/mock_destination.rs +++ b/event-core/tests/mock_destination.rs @@ -11,7 +11,7 @@ use fake::{ }; use http::Method; use integrationos_domain::{ - algebra::crypto::Crypto, + algebra::CryptoExt, common::{ api_model_config::{ApiModelConfig, AuthMethod, SamplesInput, SchemasInput}, connection_model_definition::{ @@ -142,7 +142,7 @@ pub async fn seed_db(config: &EventCoreConfig, base_url: String) -> Id { async fn get_control_store( config: &EventCoreConfig, - secrets_client: Arc, + secrets_client: Arc, ) -> MongoControlDataStore { MongoControlDataStore::new(config, secrets_client) .await @@ -183,7 +183,7 @@ async fn test_send_to_destination() { #[derive(Clone)] struct SecretsClient; #[async_trait::async_trait] - impl Crypto for SecretsClient { + impl CryptoExt for SecretsClient { async fn decrypt(&self, _secret: &GetSecretRequest) -> Result { Ok(json!({ "STRIPE_SECRET_KEY": "Stripe secret key" diff --git a/event-core/tests/mock_storage.rs b/event-core/tests/mock_storage.rs index a3fcbf06..6d9d3ff9 100644 --- a/event-core/tests/mock_storage.rs +++ b/event-core/tests/mock_storage.rs @@ -1,8 +1,3 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use chrono::Utc; @@ -12,25 +7,30 @@ use event_core::{ }; use fake::{Fake, Faker}; use integrationos_domain::{ - algebra::execution::ExecutionContext, + algebra::PipelineExt, common::{ - duplicates::Duplicates, extractor::HttpExtractor, pipeline_context::Stage as PipelineStage, - root_context::Stage, Connection, Event, ExtractorContext, Pipeline, PipelineContext, - RootContext, + duplicates::Duplicates, extractor::HttpExtractor, Connection, Event, ExtractorContext, + Pipeline, PipelineContext, RootContext, }, id::{prefix::IdPrefix, Id}, + pipeline_context::PipelineStage, + root_context::RootStage, }; use serde_json::Value; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; -type Contexts = Arc>>>>; +type Contexts = Arc>>>>; #[derive(Clone, Default)] pub struct MockStorage { pub contexts: Contexts, pub pipelines: Arc>>, pub events: Arc>>, - pub drop_at: Option, - pub fail_at: Option, + pub drop_at: Option, + pub fail_at: Option, pub fail_pipeline_at: Option, } @@ -49,7 +49,7 @@ impl MockStorage { #[async_trait] impl ContextStore for MockStorage { - async fn get(&self, context_key: &Id) -> Result { + async fn get(&self, context_key: &Id) -> Result { self.contexts .lock() .unwrap() @@ -58,13 +58,13 @@ impl ContextStore for MockStorage { let last = c.last(); last.expect("No context for {context_key}") .downcast_ref::() - .expect("ExecutionContext could not be downcast") + .expect("PipelineExt could not be downcast") .clone() }) .ok_or(anyhow!("No context for {context_key}")) } - async fn set(&self, context: T) -> Result<()> { + async fn set(&self, context: T) -> Result<()> { let context = Box::new(context); self.contexts .lock() @@ -93,16 +93,16 @@ impl ControlDataStore for MockStorage { async fn verify_event(&self, _event: &Event) -> Result { fail_at!( self.fail_at, - Some(Stage::ProcessedDuplicates), + Some(RootStage::ProcessedDuplicates), "Failed to fetch event" ); - Ok(self.drop_at != Some(Stage::ProcessedDuplicates)) + Ok(self.drop_at != Some(RootStage::ProcessedDuplicates)) } async fn get_pipelines(&self, _event: &Event) -> Result> { fail_at!( self.fail_at, - Some(Stage::Verified), + Some(RootStage::Verified), "Failed to get pipelines" ); Ok(self.pipelines.lock().unwrap().values().cloned().collect()) @@ -213,14 +213,14 @@ async fn get_and_set_contexts_downcasting_works() { } impl MockStorage { - fn get_at(&self, index: usize) -> T { + fn get_at(&self, index: usize) -> T { let c = self.contexts.lock().unwrap(); let c = c.values().flatten().collect::>(); let last = c.get(index); last.expect("No context for {context_key}") .downcast_ref::() - .expect("ExecutionContext could not be downcast") + .expect("PipelineExt could not be downcast") .clone() } } @@ -274,13 +274,16 @@ async fn run_dispatcher() { for i in 0..7 { match i { - 0 => assert_eq!(root_context!(Stage::Verified), store.get_at(i)), - 1 => assert_eq!(root_context!(Stage::ProcessedDuplicates), store.get_at(i)), + 0 => assert_eq!(root_context!(RootStage::Verified), store.get_at(i)), + 1 => assert_eq!( + root_context!(RootStage::ProcessedDuplicates), + store.get_at(i) + ), 2 => { let mut map = HashMap::new(); map.insert(pipeline.id.clone(), pipeline_context!(PipelineStage::New)); assert_eq!( - root_context!(Stage::ProcessingPipelines(map)), + root_context!(RootStage::ProcessingPipelines(map)), store.get_at(i) ); } diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index eb071429..29399e3e 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -11,12 +11,12 @@ axum-prometheus = "0.4.0" axum.workspace = true dotenvy.workspace = true envconfig.workspace = true -http.workspace = true http-serde-ext.workspace = true +http.workspace = true integrationos-domain.workspace = true moka.workspace = true mongodb.workspace = true -redis-retry = { path = "../redis-retry" } +redis.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true diff --git a/gateway/src/config.rs b/gateway/src/config.rs index ae89ee39..68fd6843 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -1,6 +1,8 @@ use envconfig::Envconfig; -use integrationos_domain::common::{database::DatabaseConfig, environment::Environment}; -use redis_retry::Config as RedisConfig; +use integrationos_domain::{ + cache::CacheConfig, + common::{database::DatabaseConfig, environment::Environment}, +}; use std::{ fmt::{Display, Formatter}, net::SocketAddr, @@ -17,7 +19,7 @@ pub struct Config { #[envconfig(from = "ENVIRONMENT", default = "live")] pub environment: Environment, #[envconfig(nested = true)] - pub redis: RedisConfig, + pub redis: CacheConfig, #[envconfig(nested = true)] pub db: DatabaseConfig, } @@ -46,7 +48,7 @@ impl Default for Config { cache_size: 10_000, secret_key: "32KFFT_i4UpkJmyPwY2TGzgHpxfXs7zS".to_owned(), environment: Environment::Test, - redis: RedisConfig::default(), + redis: CacheConfig::default(), db: DatabaseConfig::default(), } } diff --git a/gateway/src/finalizer.rs b/gateway/src/finalizer.rs index d72fb2de..097ae730 100644 --- a/gateway/src/finalizer.rs +++ b/gateway/src/finalizer.rs @@ -2,30 +2,28 @@ use crate::{config::Config, finalize_event::FinalizeEvent}; use anyhow::{bail, Context, Result}; use async_trait::async_trait; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, RedisCache, StoreExt}, common::{ - encrypted_access_key::EncryptedAccessKey, - event_with_context::EventWithContext, - mongo::{MongoDbStore, MongoDbStoreConfig}, - Event, RootContext, Store, + encrypted_access_key::EncryptedAccessKey, event_with_context::EventWithContext, Event, + RootContext, Store, }, }; use mongodb::Collection; -use redis_retry::{AsyncCommands, Redis}; +use redis::AsyncCommands; use std::sync::Arc; use tokio::sync::Mutex; use tracing::{debug, error}; pub struct Finalizer { - redis: Arc>, + redis: Arc>, context_collection: Collection, - event_store: MongoDbStore, + event_store: MongoStore, queue_name: String, } impl Finalizer { pub async fn new(config: Config) -> Result { - let redis = Redis::new_with_retry_count(&config.redis, 2).await?; + let redis = RedisCache::new(&config.redis, 2).await?; let context_mongo_client = mongodb::Client::with_uri_str(config.db.context_db_url) .await @@ -37,7 +35,7 @@ impl Finalizer { .await .with_context(|| "Could not connect to mongodb")?; let mongo = mongo.database(&config.db.event_db_name); - let event_store = MongoDbStore::new(MongoDbStoreConfig::new(mongo, Store::Events)) + let event_store = MongoStore::new(&mongo, &Store::Events) .await .with_context(|| { format!( diff --git a/google-token-fetcher/Cargo.toml b/google-token-fetcher/Cargo.toml deleted file mode 100644 index 3c3f8af1..00000000 --- a/google-token-fetcher/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "google-token-fetcher" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow.workspace = true -reqwest.workspace = true -serde.workspace = true diff --git a/google-token-fetcher/src/lib.rs b/google-token-fetcher/src/lib.rs deleted file mode 100644 index 57d01960..00000000 --- a/google-token-fetcher/src/lib.rs +++ /dev/null @@ -1,39 +0,0 @@ -use anyhow::Result; -use reqwest::Client; -use serde::Deserialize; - -const URL: &str = - "http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience="; -const HEADER_KEY: &str = "Metadata-Flavor"; -const HEADER_VALUE: &str = "Google"; - -#[derive(Debug, Clone, Deserialize)] -pub struct GoogleTokenResponse { - data: String, -} - -#[derive(Debug, Clone, Default)] -pub struct GoogleTokenFetcher { - client: Client, -} - -impl GoogleTokenFetcher { - pub fn new() -> Self { - Self { - client: Client::new(), - } - } - - pub async fn get_token(&self, url: &str) -> Result { - let res = self - .client - .get(format!("{URL}{url}")) - .header(HEADER_KEY, HEADER_VALUE) - .send() - .await? - .json::() - .await?; - - Ok(format!("Bearer {}", res.data)) - } -} diff --git a/redis-retry/Cargo.toml b/redis-retry/Cargo.toml deleted file mode 100644 index bfead0fc..00000000 --- a/redis-retry/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "redis-retry" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow.workspace = true -envconfig.workspace = true -futures-util.workspace = true -redis = { version = "0.23.3", features = ["connection-manager", "tokio-comp"] } -tracing.workspace = true diff --git a/redis-retry/src/config.rs b/redis-retry/src/config.rs deleted file mode 100644 index 53a43ce9..00000000 --- a/redis-retry/src/config.rs +++ /dev/null @@ -1,38 +0,0 @@ -use envconfig::Envconfig; -use std::fmt::{Display, Formatter}; - -#[derive(Envconfig, Debug, Clone)] -pub struct Config { - #[envconfig(from = "REDIS_URL", default = "redis://localhost:6379")] - pub url: String, - #[envconfig(from = "REDIS_QUEUE_NAME", default = "events")] - pub queue_name: String, - #[envconfig(from = "REDIS_EVENT_THROUGHPUT_KEY", default = "event_throughput")] - pub event_throughput_key: String, - #[envconfig(from = "REDIS_API_THROUGHPUT_KEY", default = "api_throughput")] - pub api_throughput_key: String, -} - -impl Default for Config { - fn default() -> Self { - Self { - url: "redis://localhost:6379".to_owned(), - queue_name: "events".to_owned(), - event_throughput_key: "event_throughput".to_owned(), - api_throughput_key: "api_throughput".to_owned(), - } - } -} - -impl Display for Config { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - writeln!(f, "REDIS_URL: {}", self.url)?; - writeln!(f, "REDIS_QUEUE_NAME: {}", self.queue_name)?; - writeln!( - f, - "REDIS_EVENT_THROUGHPUT_KEY: {}", - self.event_throughput_key - )?; - writeln!(f, "REDIS_API_THROUGHPUT_KEY: {}", self.api_throughput_key) - } -} diff --git a/redis-retry/src/lib.rs b/redis-retry/src/lib.rs deleted file mode 100644 index f1c8bdf1..00000000 --- a/redis-retry/src/lib.rs +++ /dev/null @@ -1,81 +0,0 @@ -pub mod config; - -use anyhow::{Context, Result}; -use futures_util::FutureExt; -use redis::{ - aio::{ConnectionLike, ConnectionManager}, - Client, Pipeline, RedisFuture, Value, -}; -use tracing::warn; - -pub use crate::config::Config; -pub use redis::{AsyncCommands, LposOptions, RedisResult}; - -#[derive(Clone)] -pub struct Redis { - client: Client, - conn: ConnectionManager, - retry_count: u64, -} - -impl Redis { - pub async fn new(config: &Config) -> Result { - Self::new_with_retry_count(config, std::u64::MAX).await - } - - pub async fn new_with_retry_count(config: &Config, retry_count: u64) -> Result { - let client = - Client::open(config.url.clone()).with_context(|| "Could not parse redis url")?; - let conn = client - .get_tokio_connection_manager() - .await - .with_context(|| "Could not connect to redis")?; - - Ok(Self { - client, - conn, - retry_count, - }) - } -} - -impl ConnectionLike for Redis { - fn req_packed_command<'a>(&'a mut self, cmd: &'a redis::Cmd) -> RedisFuture<'a, Value> { - (async move { - let mut retry_count = 0u64; - loop { - let res = self.conn.req_packed_command(cmd).await; - if res.is_ok() || retry_count >= self.retry_count { - return res; - } - warn!("Redis failed command, retrying..."); - retry_count += 1; - } - }) - .boxed() - } - - fn req_packed_commands<'a>( - &'a mut self, - cmd: &'a Pipeline, - offset: usize, - count: usize, - ) -> RedisFuture<'a, Vec> { - (async move { - let mut retry_count = 0u64; - loop { - let res = self.conn.req_packed_commands(cmd, offset, count).await; - if res.is_ok() || retry_count >= self.retry_count { - return res; - } - warn!("Redis failed command, retrying..."); - retry_count += 1; - } - }) - .boxed() - } - - fn get_db(&self) -> i64 { - self.client.get_connection_info().redis.db - } -} diff --git a/watchdog/Cargo.toml b/watchdog/Cargo.toml index 34a6fd0f..ab5f301e 100644 --- a/watchdog/Cargo.toml +++ b/watchdog/Cargo.toml @@ -4,15 +4,16 @@ version = "0.1.0" edition = "2021" [dependencies] -redis-retry = { path = "../redis-retry" } +anyhow.workspace = true chrono.workspace = true -mongodb.workspace = true -envconfig.workspace = true dotenvy.workspace = true -tracing-subscriber.workspace = true -tokio.workspace = true -tracing.workspace = true -integrationos-domain.workspace = true -anyhow.workspace = true +envconfig.workspace = true futures.workspace = true +integrationos-domain.workspace = true +mongodb.workspace = true +redis.workspace = true serde_json.workspace = true +tokio.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true + diff --git a/watchdog/src/config.rs b/watchdog/src/config.rs index b6a42ff2..3de725cc 100644 --- a/watchdog/src/config.rs +++ b/watchdog/src/config.rs @@ -1,8 +1,8 @@ -use envconfig::Envconfig; -use integrationos_domain::common::database::DatabaseConfig; -use redis_retry::Config as RedisConfig; use std::fmt::{Display, Formatter}; +use envconfig::Envconfig; +use integrationos_domain::{cache::CacheConfig, common::database::DatabaseConfig}; + #[derive(Envconfig, Clone)] // Intentionally no Debug so secret is not printed pub struct Config { #[envconfig(from = "EVENT_TIMEOUT", default = "300")] // 300 seconds/ 5 minutes @@ -10,7 +10,7 @@ pub struct Config { #[envconfig(from = "POLL_DURATION", default = "10")] // 10 seconds pub poll_duration: u64, #[envconfig(nested = true)] - pub redis: RedisConfig, + pub redis: CacheConfig, #[envconfig(nested = true)] pub db: DatabaseConfig, } diff --git a/watchdog/src/main.rs b/watchdog/src/main.rs index 8058487a..367da520 100644 --- a/watchdog/src/main.rs +++ b/watchdog/src/main.rs @@ -4,20 +4,18 @@ use dotenvy::dotenv; use envconfig::Envconfig; use futures::{future::join_all, TryStreamExt}; use integrationos_domain::{ - algebra::adapter::StoreAdapter, + algebra::{MongoStore, RedisCache, StoreExt}, common::{ - event_with_context::EventWithContext, - mongo::{MongoDbStore, MongoDbStoreConfig}, - pipeline_context::Stage as PipelineStage, - root_context::Stage, - Event, ExtractorContext, PipelineContext, RootContext, Store, + event_with_context::EventWithContext, ExtractorContext, PipelineContext, RootContext, Store, }, + pipeline_context::PipelineStage, + root_context::RootStage, }; use mongodb::{ bson::{doc, Bson, Document}, options::FindOneOptions, }; -use redis_retry::{AsyncCommands, LposOptions, Redis, RedisResult}; +use redis::{AsyncCommands, LposOptions, RedisResult}; use std::time::Duration; use tracing::{debug, error, info, metadata::LevelFilter, warn}; use tracing_subscriber::EnvFilter; @@ -36,7 +34,7 @@ async fn main() -> Result<()> { info!("Starting watchdog with config: {config}"); - let mut redis = Redis::new(&config.redis).await?; + let mut redis = RedisCache::new(&config.redis, 100).await?; let key = config.redis.event_throughput_key.clone(); let mut redis_clone = redis.clone(); @@ -70,7 +68,7 @@ async fn main() -> Result<()> { .with_context(|| "Could not connect to events db")?; let event_db = event_client.database(&config.db.event_db_name); - let event_store = MongoDbStore::new(MongoDbStoreConfig::::new(event_db, Store::Events)) + let event_store = MongoStore::new(&event_db, &Store::Events) .await .with_context(|| { format!( @@ -167,7 +165,7 @@ async fn main() -> Result<()> { continue; }; - if let Stage::ProcessingPipelines(ref mut pipelines) = root_context.stage { + if let RootStage::ProcessingPipelines(ref mut pipelines) = root_context.stage { let futs = pipelines.values().map(|p| { pipeline_coll.find_one( doc! {