Skip to content

Commit

Permalink
feat: implement tasks router (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagojez authored Feb 4, 2025
1 parent c3370d8 commit 8b4781b
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/src/logic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub mod platform;
pub mod platform_page;
pub mod schema_generator;
pub mod secrets;
pub mod tasks;
pub mod unified;
pub mod vault_connection;

Expand Down
60 changes: 60 additions & 0 deletions api/src/logic/tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use super::{create, delete, read, update, HookExt, PublicExt, RequestExt};
use crate::server::{AppState, AppStores};
use axum::{
routing::{patch, post},
Router,
};
use chrono::Utc;
use entities::{
event_access::EventAccess, prefix::IdPrefix, record_metadata::RecordMetadata, task::Task, Id,
};
use fake::Dummy;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;

pub fn get_router() -> Router<Arc<AppState>> {
Router::new()
.route(
"/",
post(create::<CreateRequest, Task>).get(read::<CreateRequest, Task>),
)
.route(
"/:id",
patch(update::<CreateRequest, Task>).delete(delete::<CreateRequest, Task>),
)
}

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Dummy)]
#[serde(rename_all = "camelCase")]
pub struct CreateRequest {
pub start_time: i64,
pub endpoint: String,
pub payload: Value,
}

impl RequestExt for CreateRequest {
type Output = Task;

fn from(&self) -> Option<Task> {
Some(Task {
id: Id::now(IdPrefix::Task),
start_time: Utc::now().timestamp_millis(),
end_time: None,
payload: self.payload.clone(),
endpoint: self.endpoint.clone(),
status: None,
metadata: RecordMetadata::default(),
})
}

fn get_store(stores: AppStores) -> entities::MongoStore<Self::Output> {
stores.tasks
}

fn access(&self, _: Arc<EventAccess>) -> Option<Self::Output> {
self.from()
}
}
impl HookExt<Task> for CreateRequest {}
impl PublicExt<Task> for CreateRequest {}
3 changes: 2 additions & 1 deletion api/src/router/secured_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
connection_model_schema::{
public_get_connection_model_schema, PublicGetConnectionModelSchema,
},
event_access, events, knowledge, metrics, oauth, passthrough, secrets, unified,
event_access, events, knowledge, metrics, oauth, passthrough, secrets, tasks, unified,
vault_connection,
},
middleware::{
Expand Down Expand Up @@ -38,6 +38,7 @@ pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
.nest("/event-access", event_access::get_router())
.nest("/events", events::get_router())
.nest("/knowledge", knowledge::get_router())
.nest("/tasks", tasks::get_router())
.nest("/metrics", metrics::get_router())
.nest("/oauth", oauth::get_router())
.nest("/passthrough", passthrough::get_router())
Expand Down
8 changes: 6 additions & 2 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use entities::{
page::PlatformPage,
secret::Secret,
secrets::SecretServiceProvider,
task::Task,
user::UserClient,
Connection, Event, GoogleKms, IOSKms, PlatformData, PublicConnection, SecretExt, Store,
};
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct AppStores {
pub knowledge: MongoStore<Knowledge>,
pub secrets: MongoStore<Secret>,
pub settings: MongoStore<Settings>,
pub tasks: MongoStore<Task>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -83,8 +85,8 @@ pub struct Server {

impl Server {
pub async fn init(config: ConnectionsConfig) -> Result<Self> {
let client = Client::with_uri_str(&config.db_config.control_db_url).await?;
let db = client.database(&config.db_config.control_db_name);
let client = Client::with_uri_str(&config.db_config.event_db_url).await?;
let db = client.database(&config.db_config.event_db_name);

let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(config.http_client_timeout_secs))
Expand Down Expand Up @@ -112,6 +114,7 @@ impl Server {
let knowledge = MongoStore::new(&db, &Store::ConnectionModelDefinitions).await?;
let clients = MongoStore::new(&db, &Store::Clients).await?;
let secrets_store = MongoStore::<Secret>::new(&db, &Store::Secrets).await?;
let tasks = MongoStore::new(&db, &Store::Tasks).await?;

let secrets_client: Arc<dyn SecretExt + Sync + Send> = match config.secrets_config.provider
{
Expand Down Expand Up @@ -160,6 +163,7 @@ impl Server {
knowledge,
event,
clients,
tasks,
};

let event_access_cache =
Expand Down
3 changes: 2 additions & 1 deletion api/tests/http/crud.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::context::TestServer;
use api::logic::{common_model, ReadResponse};
use api::logic::{common_model, tasks, ReadResponse};
use api::logic::{connection_definition, connection_model_definition, connection_model_schema};
use entities::task::Task;
use entities::{
common_model::CommonModel, connection_definition::ConnectionDefinition,
connection_model_definition::ConnectionModelDefinition,
Expand Down
1 change: 1 addition & 0 deletions entities/src/domain/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod emitted_events;
pub mod event_access;
pub mod event_state;
pub mod hashes;
pub mod task;

use self::{
event_state::EventState,
Expand Down
19 changes: 19 additions & 0 deletions entities/src/domain/event/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::{record_metadata::RecordMetadata, Id};
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Task {
#[serde(rename = "_id")]
pub id: Id,
pub start_time: i64,
pub end_time: Option<i64>,
pub payload: Value,
pub endpoint: String,
#[serde(with = "http_serde_ext_ios::status_code::option")]
pub status: Option<StatusCode>,
#[serde(flatten)]
pub metadata: RecordMetadata,
}
27 changes: 6 additions & 21 deletions entities/src/domain/id/prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub enum IdPrefix {
EventAccess,
EventDependency,
EventKey,
Idempotency,
Job,
JobStage,
LLMMessage,
Expand All @@ -30,13 +29,12 @@ pub enum IdPrefix {
PipelineEvent,
Platform,
PlatformPage,
Queue,
ScheduledEvent,
SessionId,
Settings,
Transaction,
UnitTest,
EarlyAccess,
Task,
}

impl Display for IdPrefix {
Expand All @@ -56,7 +54,6 @@ impl Display for IdPrefix {
IdPrefix::EventAccess => write!(f, "evt_ac"),
IdPrefix::EventDependency => write!(f, "evt_dep"),
IdPrefix::EventKey => write!(f, "evt_k"),
IdPrefix::Idempotency => write!(f, "idem"),
IdPrefix::Job => write!(f, "job"),
IdPrefix::JobStage => write!(f, "job_stg"),
IdPrefix::LLMMessage => write!(f, "llm_msg"),
Expand All @@ -68,13 +65,12 @@ impl Display for IdPrefix {
IdPrefix::PipelineEvent => write!(f, "pipe_evt"),
IdPrefix::Platform => write!(f, "plf"),
IdPrefix::PlatformPage => write!(f, "plf_pg"),
IdPrefix::Queue => write!(f, "q"),
IdPrefix::ScheduledEvent => write!(f, "sched_evt"),
IdPrefix::SessionId => write!(f, "session_id"),
IdPrefix::Settings => write!(f, "st"),
IdPrefix::Transaction => write!(f, "tx"),
IdPrefix::UnitTest => write!(f, "ut"),
IdPrefix::EarlyAccess => write!(f, "ea"),
IdPrefix::Task => write!(f, "task"),
}
}
}
Expand All @@ -98,7 +94,6 @@ impl TryFrom<&str> for IdPrefix {
"evt_ac" => Ok(IdPrefix::EventAccess),
"evt_dep" => Ok(IdPrefix::EventDependency),
"evt_k" => Ok(IdPrefix::EventKey),
"idem" => Ok(IdPrefix::Idempotency),
"job" => Ok(IdPrefix::Job),
"job_stg" => Ok(IdPrefix::JobStage),
"llm_msg" => Ok(IdPrefix::LLMMessage),
Expand All @@ -110,13 +105,12 @@ impl TryFrom<&str> for IdPrefix {
"pipe_evt" => Ok(IdPrefix::PipelineEvent),
"plf" => Ok(IdPrefix::Platform),
"plf_pg" => Ok(IdPrefix::PlatformPage),
"q" => Ok(IdPrefix::Queue),
"sched_evt" => Ok(IdPrefix::ScheduledEvent),
"session_id" => Ok(IdPrefix::SessionId),
"st" => Ok(IdPrefix::Settings),
"tx" => Ok(IdPrefix::Transaction),
"ut" => Ok(IdPrefix::UnitTest),
"ea" => Ok(IdPrefix::EarlyAccess),
"task" => Ok(IdPrefix::Task),
_ => Err(InternalError::invalid_argument(
&format!("Invalid ID prefix: {}", s),
None,
Expand All @@ -142,7 +136,6 @@ impl From<IdPrefix> for String {
IdPrefix::EventAccess => "evt_ac".to_string(),
IdPrefix::EventDependency => "evt_dep".to_string(),
IdPrefix::EventKey => "evt_k".to_string(),
IdPrefix::Idempotency => "idem".to_string(),
IdPrefix::Job => "job".to_string(),
IdPrefix::JobStage => "job_stg".to_string(),
IdPrefix::LLMMessage => "llm_msg".to_string(),
Expand All @@ -154,13 +147,12 @@ impl From<IdPrefix> for String {
IdPrefix::PipelineEvent => "pipe_evt".to_string(),
IdPrefix::Platform => "plf".to_string(),
IdPrefix::PlatformPage => "plf_pg".to_string(),
IdPrefix::Queue => "q".to_string(),
IdPrefix::ScheduledEvent => "sched_evt".to_string(),
IdPrefix::SessionId => "session_id".to_string(),
IdPrefix::Settings => "st".to_string(),
IdPrefix::Transaction => "tx".to_string(),
IdPrefix::UnitTest => "ut".to_string(),
IdPrefix::EarlyAccess => "ea".to_string(),
IdPrefix::Task => "task".to_string(),
}
}
}
Expand Down Expand Up @@ -216,7 +208,6 @@ mod test {
assert_eq!(IdPrefix::try_from("arch").unwrap(), IdPrefix::Archive);
assert_eq!(IdPrefix::try_from("evt_ac").unwrap(), IdPrefix::EventAccess);
assert_eq!(IdPrefix::try_from("evt_k").unwrap(), IdPrefix::EventKey);
assert_eq!(IdPrefix::try_from("idem").unwrap(), IdPrefix::Idempotency);
assert_eq!(IdPrefix::try_from("job").unwrap(), IdPrefix::Job);
assert_eq!(IdPrefix::try_from("job_stg").unwrap(), IdPrefix::JobStage);
assert_eq!(IdPrefix::try_from("llm_msg").unwrap(), IdPrefix::LLMMessage);
Expand All @@ -229,15 +220,11 @@ mod test {
IdPrefix::PipelineEvent
);
assert_eq!(IdPrefix::try_from("plf").unwrap(), IdPrefix::Platform);
assert_eq!(IdPrefix::try_from("q").unwrap(), IdPrefix::Queue);
assert_eq!(
IdPrefix::try_from("sched_evt").unwrap(),
IdPrefix::ScheduledEvent
);
assert_eq!(IdPrefix::try_from("st").unwrap(), IdPrefix::Settings);
assert_eq!(IdPrefix::try_from("tx").unwrap(), IdPrefix::Transaction);
assert_eq!(IdPrefix::try_from("ut").unwrap(), IdPrefix::UnitTest);
assert_eq!(IdPrefix::try_from("ea").unwrap(), IdPrefix::EarlyAccess);
assert_eq!(IdPrefix::try_from("task").unwrap(), IdPrefix::Task);
}

#[test]
Expand Down Expand Up @@ -266,7 +253,6 @@ mod test {
assert_eq!(format!("{}", IdPrefix::EventAccess), "evt_ac");
assert_eq!(format!("{}", IdPrefix::EventDependency), "evt_dep");
assert_eq!(format!("{}", IdPrefix::EventKey), "evt_k");
assert_eq!(format!("{}", IdPrefix::Idempotency), "idem");
assert_eq!(format!("{}", IdPrefix::Job), "job");
assert_eq!(format!("{}", IdPrefix::JobStage), "job_stg");
assert_eq!(format!("{}", IdPrefix::LLMMessage), "llm_msg");
Expand All @@ -278,11 +264,10 @@ mod test {
assert_eq!(format!("{}", IdPrefix::PipelineEvent), "pipe_evt");
assert_eq!(format!("{}", IdPrefix::Platform), "plf");
assert_eq!(format!("{}", IdPrefix::PlatformPage), "plf_pg");
assert_eq!(format!("{}", IdPrefix::Queue), "q");
assert_eq!(format!("{}", IdPrefix::ScheduledEvent), "sched_evt");
assert_eq!(format!("{}", IdPrefix::Settings), "st");
assert_eq!(format!("{}", IdPrefix::Transaction), "tx");
assert_eq!(format!("{}", IdPrefix::UnitTest), "ut");
assert_eq!(format!("{}", IdPrefix::EarlyAccess), "ea");
assert_eq!(format!("{}", IdPrefix::Task), "task");
}
}
8 changes: 2 additions & 6 deletions entities/src/domain/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ generate_stores!(
"microservices",
PipelineEvents,
"pipeline-events",
Idempotency,
"idempotency",
Deduplication,
"deduplication",
ScheduledEvents,
"scheduled-events",
Events,
"external-events",
EventAccess,
Expand Down Expand Up @@ -83,6 +77,8 @@ generate_stores!(
"secrets",
Settings,
"settings",
Tasks,
"tasks",
EmbedTokens,
"embed-tokens",
Sessions,
Expand Down
1 change: 1 addition & 0 deletions watchdog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ envconfig.workspace = true
futures.workspace = true
cache = { path = "../cache" }
entities = { path = "../entities" }
reqwest.workspace = true
serde_json.workspace = true
mongodb.workspace = true
redis.workspace = true
Expand Down
Loading

0 comments on commit 8b4781b

Please sign in to comment.