Skip to content

Commit

Permalink
chore: always send metadata to event bus (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Jul 18, 2024
1 parent fc2bdb4 commit bf78451
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 36 deletions.
4 changes: 2 additions & 2 deletions integrationos-api/src/endpoints/connection_definition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl RequestExt for CreateRequest {
test_connection: self.test_connection,
auth_secrets,
auth_method: self.auth_method.clone(),
multi_env: self.multi_env.clone(),
multi_env: self.multi_env,
paths: self.paths.clone(),
settings: self.settings.clone(),
hidden: false,
Expand All @@ -332,7 +332,7 @@ impl RequestExt for CreateRequest {
record.frontend.spec.tags.clone_from(&self.tags);
record.test_connection = self.test_connection;
record.platform.clone_from(&self.platform);
record.multi_env = self.multi_env.clone();
record.multi_env = self.multi_env;
record.record_metadata.active = self.active;
record
}
Expand Down
19 changes: 14 additions & 5 deletions integrationos-api/src/endpoints/unified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use convert_case::{Case, Casing};
use http::{HeaderMap, HeaderName};
use integrationos_domain::{
connection_model_definition::CrudAction, destination::Action,
encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH, environment,
encrypted_access_key::EncryptedAccessKey, encrypted_data::PASSWORD_LENGTH,
event_access::EventAccess, AccessKey, ApplicationError, Event, InternalError,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,6 +57,8 @@ pub async fn get_request(
.await
}

const META: &str = "meta";

pub async fn update_request(
event_access: Extension<Arc<EventAccess>>,
state: State<Arc<AppState>>,
Expand Down Expand Up @@ -242,7 +244,8 @@ pub async fn process_request(
e
})?;

*response.headers_mut() = response
*response.response.headers_mut() = response
.response
.headers()
.iter()
.map(|(key, value)| {
Expand All @@ -253,7 +256,7 @@ pub async fn process_request(
})
.collect::<HeaderMap>();

let (parts, body) = response.into_parts();
let (parts, body) = response.response.into_parts();

if let Some(Ok(encrypted_access_key)) =
access_key_header_value.map(|v| v.to_str().map(|s| s.to_string()))
Expand All @@ -276,9 +279,15 @@ pub async fn process_request(
error!("Could not decrypt access key: {e}");
InternalError::decryption_error("Could not decrypt access key", None)
})?;
const META: &str = "meta";
let status_code = parts.status.as_u16();

let mut metadata = body.get(META).unwrap_or(&response.metadata).clone();
if let Some(meta) = metadata.as_object_mut() {
meta.insert("status_code".to_string(), json!(status_code));
};

let body = serde_json::to_string(&json!({
META: body.get(META)
META: metadata,
}))
.map_err(|e| {
error!("Could not serialize meta body to string: {e}");
Expand Down
1 change: 1 addition & 0 deletions integrationos-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl Metric {

pub fn segment_track(&self) -> Track {
use MetricType::*;

match &self.metric_type {
Unified(conn) => Track {
user: User::UserId {
Expand Down
5 changes: 1 addition & 4 deletions integrationos-domain/src/domain/configuration/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ pub enum Environment {

impl Environment {
pub fn is_production(&self) -> bool {
match self {
Environment::Production | Environment::Live => true,
_ => false,
}
matches!(self, Environment::Production | Environment::Live)
}
}

Expand Down
69 changes: 44 additions & 25 deletions integrationos-unified/src/unified.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,19 @@ use mongodb::{
options::{Collation, CollationStrength, FindOneOptions},
Client,
};
use serde_json::{json, Map, Number, Value};
use serde_json::{json, Number, Value};
use std::{cell::RefCell, collections::HashMap, str::FromStr, sync::Arc};
use tracing::{debug, error};

thread_local! {
static JS_RUNTIME: RefCell<Script> = RefCell::new(Script::new());
}

pub struct UnifiedResponse {
pub response: Response<Value>,
pub metadata: Value,
}

#[derive(Clone)]
pub struct UnifiedDestination {
pub connections_cache: ConnectionCacheArcStrKey,
Expand Down Expand Up @@ -226,7 +231,7 @@ impl UnifiedDestination {
mut headers: HeaderMap,
mut query_params: HashMap<String, String>,
mut body: Option<Value>,
) -> Result<Response<Value>, IntegrationOSError> {
) -> Result<UnifiedResponse, IntegrationOSError> {
let key = Destination {
platform: connection.platform.clone(),
action: action.clone(),
Expand Down Expand Up @@ -335,6 +340,25 @@ impl UnifiedDestination {
schema_id.to_string().replace([':', '-'], "_")
};

let mut metadata = json!({
"timestamp": Utc::now().timestamp_millis(),
"platformRateLimitRemaining": 0,
"rateLimitRemaining": 0,
"host": headers.get("host").map(|v| v.to_str().unwrap_or("")),
"cache": {
"hit": false,
"ttl": 0,
"key": ""
},
"transactionKey": Id::now(IdPrefix::Transaction),
"platform": connection.platform,
"platformVersion": connection.platform_version,
"action": config.action_name,
"commonModel": config.mapping.as_ref().map(|m| &m.common_model_name),
"commonModelVersion": "v1",
"connectionKey": connection.key,
});

body = if let Some(body) = body {
if let Some(js) = mapping.as_ref().map(|m| m.from_common_model.as_str()) {
debug!(
Expand Down Expand Up @@ -552,7 +576,10 @@ impl UnifiedDestination {
IntegrationOSError::from_err_code(status, &e.to_string(), None)
})?;
*res.headers_mut() = headers;
return Ok(res);
return Ok(UnifiedResponse {
metadata: metadata.clone(),
response: res,
});
}

let status = res.status();
Expand Down Expand Up @@ -701,7 +728,10 @@ impl UnifiedDestination {
)
})?;
*res.headers_mut() = headers;
return Ok(res);
return Ok(UnifiedResponse {
metadata: metadata.clone(),
response: res,
});
}

if bodies.len() != 1 && is_returning_error {
Expand Down Expand Up @@ -900,27 +930,13 @@ impl UnifiedDestination {
}

if let Value::Object(ref mut response) = &mut response {
let meta = json!({
"timestamp": Utc::now().timestamp_millis(),
"latency": latency,
"platformRateLimitRemaining": 0,
"rateLimitRemaining": 0,
"cache": {
"hit": false,
"ttl": 0,
"key": ""
},
"transactionKey": Id::now(IdPrefix::Transaction),
"platform": connection.platform,
"platformVersion": connection.platform_version,
"action": config.action_name,
"commonModel": config.mapping.as_ref().map(|m| &m.common_model_name),
"commonModelVersion": "v1",
"connectionKey": connection.key,
"hash": hash.inner(),
});
if let Some(meta) = metadata.as_object_mut() {
meta.insert("latency".to_string(), Value::Number(Number::from(latency)));
meta.insert("hash".to_string(), Value::String(hash.inner().into()));
}

const META: &str = "meta";
response.insert(META.to_string(), meta);
response.insert(META.to_string(), metadata.clone());
}

let mut builder = Response::builder();
Expand All @@ -947,7 +963,10 @@ impl UnifiedDestination {
IntegrationOSError::from_err_code(status, &e.to_string(), None)
})?;

Ok(res)
Ok(UnifiedResponse {
metadata: metadata.clone(),
response: res,
})
}

pub async fn send_to_destination(
Expand Down

0 comments on commit bf78451

Please sign in to comment.