Skip to content

Commit

Permalink
chore: adding crud action and model name for analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez committed Jul 9, 2024
1 parent 44eb2ee commit 6233d66
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 6 deletions.
83 changes: 83 additions & 0 deletions integrationos-api/src/endpoints/schema_generator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use super::ReadResponse;
use crate::server::AppState;
use axum::{
extract::{Path, State},
routing::get,
Json, Router,
};
use bson::{doc, Document};
use futures::StreamExt;
use integrationos_domain::{ApplicationError, Id, IntegrationOSError, InternalError, Store};
use mongodb::options::FindOptions;
use std::sync::Arc;

pub fn get_router() -> Router<Arc<AppState>> {
Router::new()
.route("/projection", get(get_common_model_proj))
.route("/:id", get(generate_schema))
}

pub async fn get_common_model_proj(
state: State<Arc<AppState>>,
) -> Result<Json<ReadResponse<Document>>, IntegrationOSError> {
let collection = state
.app_stores
.db
.collection::<Document>(&Store::CommonModels.to_string());

let filter = doc! {
"deleted": false,
"primary": true,
"active": true,
};
let options = FindOptions::builder().projection(doc! { "_id": 1 }).build();

let mut cursor = collection.find(filter, options).await?;
let mut common_models: Vec<Document> = Vec::new();

while let Some(result) = cursor.next().await {
match result {
Ok(document) => {
common_models.push(document);
}
_ => {
return Err(IntegrationOSError::from(InternalError::unknown(
"Error while fetching common models",
None,
)));
}
}
}

let len = common_models.len();

Ok(Json(ReadResponse {
rows: common_models,
total: len as u64,
skip: 0,
limit: 0,
}))
}

pub async fn generate_schema(
state: State<Arc<AppState>>,
Path(id): Path<Id>,
) -> Result<String, IntegrationOSError> {
let cm_store = state.app_stores.common_model.clone();
let ce_store = state.app_stores.common_enum.clone();

let common_model = cm_store
.get_one_by_id(&id.to_string())
.await
.map_err(|e| IntegrationOSError::from(e))?
.ok_or(ApplicationError::not_found(
&format!("CommonModel with id {} not found", id),
None,
))?;

let schema = common_model
.as_typescript_schema_expanded(&cm_store, &ce_store)
.await;

Ok(schema)
}
6 changes: 3 additions & 3 deletions integrationos-api/src/endpoints/unified.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER};
use super::{common_model, get_connection, INTEGRATION_OS_PASSTHROUGH_HEADER};
use crate::{config::Headers, metrics::Metric, server::AppState};
use axum::{
extract::{Path, Query, State},
Expand Down Expand Up @@ -226,7 +226,7 @@ pub async fn process_request(
.extractor_caller
.send_to_destination_unified(
connection.clone(),
action,
action.clone(),
include_passthrough,
state.config.environment,
headers,
Expand Down Expand Up @@ -303,7 +303,7 @@ pub async fn process_request(
}
};

let metric = Metric::unified(connection.clone());
let metric = Metric::unified(connection.clone(), action.clone());
if let Err(e) = state.metric_tx.send(metric).await {
error!("Could not send metric to receiver: {e}");
}
Expand Down
14 changes: 11 additions & 3 deletions integrationos-api/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use chrono::{DateTime, Datelike, Utc};
use http::HeaderValue;
use integrationos_domain::{event_access::EventAccess, ownership::Ownership, Connection};
use integrationos_domain::{
destination::Action, event_access::EventAccess, ownership::Ownership, Connection,
};
use segment::message::{Track, User};
use serde::Deserialize;
use serde_json::json;
Expand Down Expand Up @@ -39,27 +41,31 @@ impl MetricType {
pub struct Metric {
pub metric_type: MetricType,
pub date: DateTime<Utc>,
pub action: Option<Action>,
}

impl Metric {
pub fn passthrough(connection: Arc<Connection>) -> Self {
Self {
metric_type: MetricType::Passthrough(connection),
date: Utc::now(),
action: None,
}
}

pub fn unified(connection: Arc<Connection>) -> Self {
pub fn unified(connection: Arc<Connection>, action: Action) -> Self {
Self {
metric_type: MetricType::Unified(connection),
date: Utc::now(),
action: Some(action),
}
}

pub fn rate_limited(event_access: Arc<EventAccess>, key: Option<HeaderValue>) -> Self {
Self {
metric_type: MetricType::RateLimited(event_access, key),
date: Utc::now(),
action: None,
}
}

Expand Down Expand Up @@ -123,7 +129,9 @@ impl Metric {
"platform": self.platform(),
"platformVersion": &conn.platform_version,
"clientId": self.ownership().client_id,
"version": &conn.record_metadata.version
"version": &conn.record_metadata.version,
"commonModel": self.action.as_ref().map(|a| a.name()),
"action": self.action.as_ref().map(|a| a.action()),
}),
..Default::default()
},
Expand Down
16 changes: 16 additions & 0 deletions integrationos-domain/src/domain/pipeline/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ pub enum Action {
},
}

impl Action {
pub fn name(&self) -> &str {
match self {
Action::Passthrough { path, .. } => path,
Action::Unified { name, .. } => name,
}
}

pub fn action(&self) -> Option<&CrudAction> {
match self {
Action::Passthrough { .. } => None,
Action::Unified { action, .. } => Some(action),
}
}
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
#[cfg_attr(feature = "dummy", derive(fake::Dummy))]
#[serde(rename_all = "camelCase")]
Expand Down

0 comments on commit 6233d66

Please sign in to comment.