diff --git a/api/src/endpoints/common_model.rs b/api/src/endpoints/common_model.rs index dda5bc6c..19005f60 100644 --- a/api/src/endpoints/common_model.rs +++ b/api/src/endpoints/common_model.rs @@ -96,12 +96,13 @@ impl RequestExt for CreateRequest { Some(record) } - fn update(&self, record: &mut Self::Output) { + fn update(&self, mut record: Self::Output) -> Self::Output { record.name = self.name.clone(); record.record_metadata.version = self.version.clone(); record.fields = self.fields.clone(); record.category = self.category.clone(); record.sample = self.sample.clone(); + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_definition.rs b/api/src/endpoints/connection_definition.rs index b2f42c92..4397bea4 100644 --- a/api/src/endpoints/connection_definition.rs +++ b/api/src/endpoints/connection_definition.rs @@ -1,5 +1,5 @@ use super::{ - create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt, Unit, + create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt, }; use crate::{ internal_server_error, not_found, @@ -319,7 +319,7 @@ impl RequestExt for CreateRequest { Some(record) } - fn update(&self, record: &mut Self::Output) -> Unit { + fn update(&self, mut record: Self::Output) -> Self::Output { record.name = self.name.clone(); record.frontend.spec.description = self.description.clone(); record.frontend.spec.category = self.category.clone(); @@ -328,6 +328,7 @@ impl RequestExt for CreateRequest { record.test_connection = self.test_connection; record.platform = self.platform.clone(); record.record_metadata.active = self.active; + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_model_definition.rs b/api/src/endpoints/connection_model_definition.rs index db2932b8..4ea8d1ac 100644 --- a/api/src/endpoints/connection_model_definition.rs +++ b/api/src/endpoints/connection_model_definition.rs @@ -1,4 +1,4 @@ -use super::{create, delete, read, update, HookExt, RequestExt, Unit}; +use super::{create, delete, read, update, HookExt, RequestExt}; use crate::{ api_payloads::ErrorResponse, internal_server_error, not_found, @@ -348,7 +348,7 @@ impl RequestExt for CreateRequest { Some(record) } - fn update(&self, record: &mut Self::Output) -> Unit { + fn update(&self, mut record: Self::Output) -> Self::Output { let key = format!( "api::{}::{}::{}::{}::{}::{}", self.connection_platform, @@ -383,6 +383,8 @@ impl RequestExt for CreateRequest { record.mapping = self.mapping.clone(); record.extractor_config = self.extractor_config.clone(); record.record_metadata.version = self.version.clone(); + + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_model_schema.rs b/api/src/endpoints/connection_model_schema.rs index 7c0d7bec..6850edfd 100644 --- a/api/src/endpoints/connection_model_schema.rs +++ b/api/src/endpoints/connection_model_schema.rs @@ -112,7 +112,7 @@ impl RequestExt for CreateRequest { }) } - fn update(&self, record: &mut Self::Output) { + fn update(&self, mut record: Self::Output) -> Self::Output { record.platform_id = self.platform_id; record.platform_page_id = self.platform_page_id; record.connection_platform = self.connection_platform.clone(); @@ -123,6 +123,7 @@ impl RequestExt for CreateRequest { record.sample = self.sample.clone(); record.paths = self.paths.clone(); record.mapping = self.mapping.clone(); + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/connection_oauth_definition.rs b/api/src/endpoints/connection_oauth_definition.rs index cbe4fb34..f1c4986d 100644 --- a/api/src/endpoints/connection_oauth_definition.rs +++ b/api/src/endpoints/connection_oauth_definition.rs @@ -1,4 +1,4 @@ -use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt, Unit}; +use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt}; use crate::server::{AppState, AppStores}; use axum::{ routing::{patch, post}, @@ -125,7 +125,7 @@ impl RequestExt for CreateRequest { }) } - fn update(&self, record: &mut Self::Output) -> Unit { + fn update(&self, mut record: Self::Output) -> Self::Output { record.connection_platform = self.connection_platform.clone(); record.configuration = OAuthApiConfig { init: self.init.configuration.clone(), @@ -180,6 +180,8 @@ impl RequestExt for CreateRequest { }; record.record_metadata.updated_at = Utc::now().timestamp_millis(); record.record_metadata.updated = true; + + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/mod.rs b/api/src/endpoints/mod.rs index 254d1ca4..9079bed9 100644 --- a/api/src/endpoints/mod.rs +++ b/api/src/endpoints/mod.rs @@ -51,7 +51,6 @@ pub type InMemoryCache = Arc>, Arc>> pub trait RequestExt: Sized { type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static; - /// Generate `Self::Output` of the request based on the given payload. /// /// @param self @@ -69,10 +68,10 @@ pub trait RequestExt: Sized { None } - /// Update the output of the request based on the input. - fn update(&self, _: &mut Self::Output) -> Unit {} + fn update(&self, output: Self::Output) -> Self::Output { + output + } - /// Get the store for the request. fn get_store(stores: AppStores) -> MongoStore; } @@ -285,7 +284,7 @@ where let store = T::get_store(state.app_stores.clone()); - let Some(mut record) = (match store.get_one(query.filter).await { + let Some(record) = (match store.get_one(query.filter).await { Ok(ret) => ret, Err(e) => { error!("Error getting record in store: {e}"); @@ -295,7 +294,7 @@ where return Err(not_found!("Record")); }; - req.update(&mut record); + let record = req.update(record); let bson = bson::to_bson_with_options( &record, diff --git a/api/src/endpoints/pipeline.rs b/api/src/endpoints/pipeline.rs index 6efd49d8..ca6b3c16 100644 --- a/api/src/endpoints/pipeline.rs +++ b/api/src/endpoints/pipeline.rs @@ -64,7 +64,7 @@ impl RequestExt for CreatePipelineRequest { }) } - fn update(&self, record: &mut Self::Output) { + fn update(&self, mut record: Self::Output) -> Self::Output { let CreatePipelineRequest { name, key, @@ -83,6 +83,8 @@ impl RequestExt for CreatePipelineRequest { record.signature = signature.clone(); record.config = Some(config.clone()); record.record_metadata.mark_updated(&record.ownership.id); + + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/platform.rs b/api/src/endpoints/platform.rs index 3dfa0991..0211d266 100644 --- a/api/src/endpoints/platform.rs +++ b/api/src/endpoints/platform.rs @@ -57,13 +57,15 @@ impl RequestExt for CreateRequest { }) } - fn update(&self, record: &mut Self::Output) { + fn update(&self, mut record: Self::Output) -> Self::Output { record.connection_definition_id = self.connection_definition_id; record.name = self.name.clone(); record.url = self.url.clone(); record.platform_version = self.version.clone(); record.ownership = self.ownership.clone(); record.analyzed = self.analyzed; + + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/endpoints/platform_page.rs b/api/src/endpoints/platform_page.rs index c983fa27..742a394c 100644 --- a/api/src/endpoints/platform_page.rs +++ b/api/src/endpoints/platform_page.rs @@ -118,7 +118,7 @@ impl RequestExt for CreateRequest { }) } - fn update(&self, record: &mut Self::Output) { + fn update(&self, mut record: Self::Output) -> Self::Output { record.platform_id = self.platform_id; record.connection_definition_id = self.connection_definition_id; record.r#type = self.r#type.clone(); @@ -127,6 +127,8 @@ impl RequestExt for CreateRequest { record.content = self.content.clone(); record.ownership = self.ownership.clone(); record.analyzed = self.analyzed; + + record } fn get_store(stores: AppStores) -> MongoStore { diff --git a/api/src/routes/authenticated.rs b/api/src/routes/authenticated.rs index 3f65efad..0927e738 100644 --- a/api/src/routes/authenticated.rs +++ b/api/src/routes/authenticated.rs @@ -1,7 +1,7 @@ use crate::{ endpoints::{ common_model, connection_definition, - connection_model_definition::{self, test_connection_model_definition}, + connection_model_definition::{self}, connection_model_schema, connection_oauth_definition, openapi, platform, platform_page, }, middleware::jwt_auth::{self, JwtState}, @@ -13,10 +13,6 @@ use tower_http::trace::TraceLayer; pub async fn get_router(state: &Arc) -> Router> { let routes = Router::new() - .route( - "/connection-model-definitions/test/:id", - post(test_connection_model_definition), - ) .nest( "/connection-definitions", connection_definition::get_router(), diff --git a/api/src/routes/public.rs b/api/src/routes/public.rs index f94ff2e7..4ce9efe9 100644 --- a/api/src/routes/public.rs +++ b/api/src/routes/public.rs @@ -1,6 +1,7 @@ use crate::{ endpoints::{ - connection_definition, connection_model_schema, connection_oauth_definition, + connection_definition, connection_model_definition::test_connection_model_definition, + connection_model_schema, connection_oauth_definition, event_access::create_event_access_for_new_user, openapi, read_cached, }, middleware::jwt_auth::{self, JwtState}, @@ -24,6 +25,10 @@ pub fn get_router(state: &Arc) -> Router> { jwt_auth::jwt_auth, )), ) + .route( + "/connection-model-definitions/test/:id", + post(test_connection_model_definition), + ) .route( "/connection-definitions", get(read_cached::), diff --git a/api/src/server.rs b/api/src/server.rs index c911e788..f05c9c1e 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -2,7 +2,7 @@ use crate::{ config::Config, endpoints::{ connection_oauth_definition::FrontendOauthConnectionDefinition, openapi::OpenAPIData, - ReadResponse, + InMemoryCache, ReadResponse, }, metrics::Metric, routes, @@ -27,7 +27,7 @@ use integrationos_domain::{ use moka::future::Cache; use mongodb::{options::UpdateOptions, Client, Database}; use segment::{AutoBatcher, Batcher, HttpClient}; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use tokio::{net::TcpListener, sync::mpsc::Sender, time::timeout, try_join}; use tracing::{error, info, trace, warn}; @@ -63,14 +63,9 @@ pub struct AppState { pub openapi_data: OpenAPIData, pub http_client: reqwest::Client, pub connections_cache: Cache<(Arc, HeaderValue), Arc>, - pub connection_definitions_cache: - Arc>, Arc>>>, - pub connection_oauth_definitions_cache: Arc< - Cache< - Option>, - Arc>, - >, - >, + pub connection_definitions_cache: InMemoryCache>, + pub connection_oauth_definitions_cache: + InMemoryCache>, pub secrets_client: Arc, pub extractor_caller: UnifiedDestination, pub event_tx: Sender,