From 184ae7cba3c9b256e30967cf45d96fd29ecc4b7b Mon Sep 17 00:00:00 2001 From: Samuel Gomez Date: Thu, 4 Jul 2024 17:53:00 +0100 Subject: [PATCH] chore: adding environment to unification body parsing --- integrationos-api/src/config.rs | 7 +- integrationos-api/src/endpoints/unified.rs | 12 ++-- .../src/domain/configuration/environment.rs | 9 +++ integrationos-unified/src/unified.rs | 64 +++++++++++-------- 4 files changed, 56 insertions(+), 36 deletions(-) diff --git a/integrationos-api/src/config.rs b/integrationos-api/src/config.rs index ed83709e..7e0805dc 100644 --- a/integrationos-api/src/config.rs +++ b/integrationos-api/src/config.rs @@ -4,7 +4,7 @@ use std::{ }; use envconfig::Envconfig; -use integrationos_domain::cache::CacheConfig; +use integrationos_domain::{cache::CacheConfig, environment::Environment}; use integrationos_domain::{ database::DatabaseConfig, openai::OpenAiConfig, secrets::SecretsConfig, }; @@ -85,6 +85,8 @@ pub struct Config { pub cache_config: CacheConfig, #[envconfig(from = "RATE_LIMIT_ENABLED", default = "true")] pub rate_limit_enabled: bool, + #[envconfig(from = "ENVIRONMENT", default = "development")] + pub environment: Environment, } impl Display for Config { @@ -146,7 +148,8 @@ impl Display for Config { writeln!(f, "{}", self.db_config)?; writeln!(f, "{}", self.openai_config)?; writeln!(f, "{}", self.cache_config)?; - writeln!(f, "RATE_LIMIT_ENABLED: {}", self.rate_limit_enabled) + writeln!(f, "RATE_LIMIT_ENABLED: {}", self.rate_limit_enabled)?; + writeln!(f, "ENVIRONMENT: {}", self.environment) } } diff --git a/integrationos-api/src/endpoints/unified.rs b/integrationos-api/src/endpoints/unified.rs index 38471f11..dec66dbe 100644 --- a/integrationos-api/src/endpoints/unified.rs +++ b/integrationos-api/src/endpoints/unified.rs @@ -10,12 +10,9 @@ use bson::doc; use convert_case::{Case, Casing}; use http::{HeaderMap, HeaderName}; use integrationos_domain::{ - ApplicationError, InternalError, - { - connection_model_definition::CrudAction, destination::Action, - encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, - event_access::EventAccess, AccessKey, Event, - }, + connection_model_definition::CrudAction, destination::Action, + encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, environment, + event_access::EventAccess, AccessKey, ApplicationError, Event, InternalError, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -223,12 +220,15 @@ pub async fn process_request( connection.platform, connection.platform_version, model_name, action_name, ); + // let environment = state.config.connection_definition_cache_ttl_secs + let mut response = state .extractor_caller .send_to_destination_unified( connection.clone(), action, include_passthrough, + state.config.environment, headers, query_params, body, diff --git a/integrationos-domain/src/domain/configuration/environment.rs b/integrationos-domain/src/domain/configuration/environment.rs index 7d3a12c8..97312732 100644 --- a/integrationos-domain/src/domain/configuration/environment.rs +++ b/integrationos-domain/src/domain/configuration/environment.rs @@ -15,6 +15,15 @@ pub enum Environment { Production, } +impl Environment { + pub fn is_production(&self) -> bool { + match self { + Environment::Production | Environment::Live => true, + _ => false, + } + } +} + impl TryFrom<&str> for Environment { type Error = IntegrationOSError; diff --git a/integrationos-unified/src/unified.rs b/integrationos-unified/src/unified.rs index 135760ab..3af74b9e 100644 --- a/integrationos-unified/src/unified.rs +++ b/integrationos-unified/src/unified.rs @@ -24,6 +24,7 @@ use integrationos_domain::{ connection_model_schema::ConnectionModelSchema, database::DatabaseConfig, destination::{Action, Destination}, + environment::{self, Environment}, error::InternalError, get_secret_request::GetSecretRequest, hashed_secret::HashedSecret, @@ -221,6 +222,7 @@ impl UnifiedDestination { connection: Arc, action: Action, include_passthrough: bool, + environment: Environment, mut headers: HeaderMap, mut query_params: HashMap, mut body: Option, @@ -675,37 +677,43 @@ impl UnifiedDestination { error!("Could not select body at response path {path}: {e}"); ApplicationError::bad_request(&e.to_string(), None) })?; - if bodies.is_empty() { - let error_string = format!( + if environment.is_production() + && matches!(config.action_name, CrudAction::GetMany | CrudAction::GetOne) + { + if bodies.is_empty() { + let error_string = format!( "Could not map unified model. 3rd party Connection returned an invalid response. Expected model at path {path} but found none.", ); - let mut res = Response::builder() - .status(StatusCode::UNPROCESSABLE_ENTITY) - .body(json!({ - "message": error_string, - "passthrough": wrapped_body - })) - .map_err(|e| { - error!("Could not create response from builder for missing body"); - IntegrationOSError::from_err_code( - StatusCode::UNPROCESSABLE_ENTITY, - &e.to_string(), - None, - ) - })?; - *res.headers_mut() = headers; - return Ok(res); - } - if bodies.len() != 1 { - return Err(InternalError::invalid_argument( - &format!( - "Invalid number of selected bodies ({}) at response path {path}", - bodies.len() - ), - None, - )); + let mut res = Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(json!({ + "message": error_string, + "passthrough": wrapped_body + })) + .map_err(|e| { + error!("Could not create response from builder for missing body"); + IntegrationOSError::from_err_code( + StatusCode::UNPROCESSABLE_ENTITY, + &e.to_string(), + None, + ) + })?; + *res.headers_mut() = headers; + return Ok(res); + } + if bodies.len() != 1 { + return Err(InternalError::invalid_argument( + &format!( + "Invalid number of selected bodies ({}) at response path {path}", + bodies.len() + ), + None, + )); + } + Some(bodies.remove(0).clone()) + } else { + None } - Some(bodies.remove(0).clone()) } else { None };