diff --git a/integrationos-api/src/endpoints/unified.rs b/integrationos-api/src/endpoints/unified.rs index dec66dbe..902a0a3b 100644 --- a/integrationos-api/src/endpoints/unified.rs +++ b/integrationos-api/src/endpoints/unified.rs @@ -226,7 +226,7 @@ pub async fn process_request( .extractor_caller .send_to_destination_unified( connection.clone(), - action, + action.clone(), include_passthrough, state.config.environment, headers, @@ -303,7 +303,7 @@ pub async fn process_request( } }; - let metric = Metric::unified(connection.clone()); + let metric = Metric::unified(connection.clone(), action); if let Err(e) = state.metric_tx.send(metric).await { error!("Could not send metric to receiver: {e}"); } diff --git a/integrationos-api/src/metrics.rs b/integrationos-api/src/metrics.rs index 88e0c04c..84f25a16 100644 --- a/integrationos-api/src/metrics.rs +++ b/integrationos-api/src/metrics.rs @@ -1,6 +1,8 @@ use chrono::{DateTime, Datelike, Utc}; use http::HeaderValue; -use integrationos_domain::{event_access::EventAccess, ownership::Ownership, Connection}; +use integrationos_domain::{ + destination::Action, event_access::EventAccess, ownership::Ownership, Connection, +}; use segment::message::{Track, User}; use serde::Deserialize; use serde_json::json; @@ -39,6 +41,7 @@ impl MetricType { pub struct Metric { pub metric_type: MetricType, pub date: DateTime, + pub action: Option, } impl Metric { @@ -46,13 +49,15 @@ impl Metric { Self { metric_type: MetricType::Passthrough(connection), date: Utc::now(), + action: None, } } - pub fn unified(connection: Arc) -> Self { + pub fn unified(connection: Arc, action: Action) -> Self { Self { metric_type: MetricType::Unified(connection), date: Utc::now(), + action: Some(action), } } @@ -60,6 +65,7 @@ impl Metric { Self { metric_type: MetricType::RateLimited(event_access, key), date: Utc::now(), + action: None, } } @@ -123,7 +129,9 @@ impl Metric { "platform": self.platform(), "platformVersion": &conn.platform_version, "clientId": self.ownership().client_id, - "version": &conn.record_metadata.version + "version": &conn.record_metadata.version, + "commonModel": self.action.as_ref().map(|a| a.name()), + "action": self.action.as_ref().map(|a| a.action()), }), ..Default::default() }, diff --git a/integrationos-domain/src/domain/pipeline/destination.rs b/integrationos-domain/src/domain/pipeline/destination.rs index aae5cbbb..86783fd5 100644 --- a/integrationos-domain/src/domain/pipeline/destination.rs +++ b/integrationos-domain/src/domain/pipeline/destination.rs @@ -22,6 +22,22 @@ pub enum Action { }, } +impl Action { + pub fn name(&self) -> &str { + match self { + Action::Passthrough { path, .. } => path, + Action::Unified { name, .. } => name, + } + } + + pub fn action(&self) -> Option<&CrudAction> { + match self { + Action::Passthrough { .. } => None, + Action::Unified { action, .. } => Some(action), + } + } +} + #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "dummy", derive(fake::Dummy))] #[serde(rename_all = "camelCase")]