diff --git a/integrationos-api/src/endpoints/schema_generator.rs b/integrationos-api/src/endpoints/schema_generator.rs new file mode 100644 index 00000000..855629b4 --- /dev/null +++ b/integrationos-api/src/endpoints/schema_generator.rs @@ -0,0 +1,83 @@ +use super::ReadResponse; +use crate::server::AppState; +use axum::{ + extract::{Path, State}, + routing::get, + Json, Router, +}; +use bson::{doc, Document}; +use futures::StreamExt; +use integrationos_domain::{ApplicationError, Id, IntegrationOSError, InternalError, Store}; +use mongodb::options::FindOptions; +use std::sync::Arc; + +pub fn get_router() -> Router> { + Router::new() + .route("/projection", get(get_common_model_proj)) + .route("/:id", get(generate_schema)) +} + +pub async fn get_common_model_proj( + state: State>, +) -> Result>, IntegrationOSError> { + let collection = state + .app_stores + .db + .collection::(&Store::CommonModels.to_string()); + + let filter = doc! { + "deleted": false, + "primary": true, + "active": true, + }; + let options = FindOptions::builder().projection(doc! { "_id": 1 }).build(); + + let mut cursor = collection.find(filter, options).await?; + let mut common_models: Vec = Vec::new(); + + while let Some(result) = cursor.next().await { + match result { + Ok(document) => { + common_models.push(document); + } + _ => { + return Err(IntegrationOSError::from(InternalError::unknown( + "Error while fetching common models", + None, + ))); + } + } + } + + let len = common_models.len(); + + Ok(Json(ReadResponse { + rows: common_models, + total: len as u64, + skip: 0, + limit: 0, + })) +} + +pub async fn generate_schema( + state: State>, + Path(id): Path, +) -> Result { + let cm_store = state.app_stores.common_model.clone(); + let ce_store = state.app_stores.common_enum.clone(); + + let common_model = cm_store + .get_one_by_id(&id.to_string()) + .await + .map_err(|e| IntegrationOSError::from(e))? + .ok_or(ApplicationError::not_found( + &format!("CommonModel with id {} not found", id), + None, + ))?; + + let schema = common_model + .as_typescript_schema_expanded(&cm_store, &ce_store) + .await; + + Ok(schema) +} diff --git a/integrationos-api/src/endpoints/unified.rs b/integrationos-api/src/endpoints/unified.rs index dec66dbe..6d89dd63 100644 --- a/integrationos-api/src/endpoints/unified.rs +++ b/integrationos-api/src/endpoints/unified.rs @@ -1,4 +1,4 @@ -use super::{get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER}; +use super::{common_model, get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER}; use crate::{config::Headers, metrics::Metric, server::AppState}; use axum::{ extract::{Path, Query, State}, @@ -226,7 +226,7 @@ pub async fn process_request( .extractor_caller .send_to_destination_unified( connection.clone(), - action, + action.clone(), include_passthrough, state.config.environment, headers, @@ -303,7 +303,7 @@ pub async fn process_request( } }; - let metric = Metric::unified(connection.clone()); + let metric = Metric::unified(connection.clone(), action.clone()); if let Err(e) = state.metric_tx.send(metric).await { error!("Could not send metric to receiver: {e}"); } diff --git a/integrationos-api/src/metrics.rs b/integrationos-api/src/metrics.rs index 88e0c04c..84f25a16 100644 --- a/integrationos-api/src/metrics.rs +++ b/integrationos-api/src/metrics.rs @@ -1,6 +1,8 @@ use chrono::{DateTime, Datelike, Utc}; use http::HeaderValue; -use integrationos_domain::{event_access::EventAccess, ownership::Ownership, Connection}; +use integrationos_domain::{ + destination::Action, event_access::EventAccess, ownership::Ownership, Connection, +}; use segment::message::{Track, User}; use serde::Deserialize; use serde_json::json; @@ -39,6 +41,7 @@ impl MetricType { pub struct Metric { pub metric_type: MetricType, pub date: DateTime, + pub action: Option, } impl Metric { @@ -46,13 +49,15 @@ impl Metric { Self { metric_type: MetricType::Passthrough(connection), date: Utc::now(), + action: None, } } - pub fn unified(connection: Arc) -> Self { + pub fn unified(connection: Arc, action: Action) -> Self { Self { metric_type: MetricType::Unified(connection), date: Utc::now(), + action: Some(action), } } @@ -60,6 +65,7 @@ impl Metric { Self { metric_type: MetricType::RateLimited(event_access, key), date: Utc::now(), + action: None, } } @@ -123,7 +129,9 @@ impl Metric { "platform": self.platform(), "platformVersion": &conn.platform_version, "clientId": self.ownership().client_id, - "version": &conn.record_metadata.version + "version": &conn.record_metadata.version, + "commonModel": self.action.as_ref().map(|a| a.name()), + "action": self.action.as_ref().map(|a| a.action()), }), ..Default::default() }, diff --git a/integrationos-domain/src/domain/pipeline/destination.rs b/integrationos-domain/src/domain/pipeline/destination.rs index aae5cbbb..86783fd5 100644 --- a/integrationos-domain/src/domain/pipeline/destination.rs +++ b/integrationos-domain/src/domain/pipeline/destination.rs @@ -22,6 +22,22 @@ pub enum Action { }, } +impl Action { + pub fn name(&self) -> &str { + match self { + Action::Passthrough { path, .. } => path, + Action::Unified { name, .. } => name, + } + } + + pub fn action(&self) -> Option<&CrudAction> { + match self { + Action::Passthrough { .. } => None, + Action::Unified { action, .. } => Some(action), + } + } +} + #[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "dummy", derive(fake::Dummy))] #[serde(rename_all = "camelCase")]