diff --git a/Cargo.lock b/Cargo.lock index 62e9c0b..71738ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6166,6 +6166,7 @@ dependencies = [ "futures", "mongodb", "redis 0.27.5", + "reqwest", "serde_json", "tokio", "tracing", diff --git a/api/src/logic/mod.rs b/api/src/logic/mod.rs index 2cbe2ab..503c8d6 100644 --- a/api/src/logic/mod.rs +++ b/api/src/logic/mod.rs @@ -40,6 +40,7 @@ pub mod platform; pub mod platform_page; pub mod schema_generator; pub mod secrets; +pub mod tasks; pub mod unified; pub mod vault_connection; diff --git a/api/src/logic/tasks.rs b/api/src/logic/tasks.rs new file mode 100644 index 0000000..8376051 --- /dev/null +++ b/api/src/logic/tasks.rs @@ -0,0 +1,60 @@ +use super::{create, delete, read, update, HookExt, PublicExt, RequestExt}; +use crate::server::{AppState, AppStores}; +use axum::{ + routing::{patch, post}, + Router, +}; +use chrono::Utc; +use entities::{ + event_access::EventAccess, prefix::IdPrefix, record_metadata::RecordMetadata, task::Task, Id, +}; +use fake::Dummy; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::sync::Arc; + +pub fn get_router() -> Router> { + Router::new() + .route( + "/", + post(create::).get(read::), + ) + .route( + "/:id", + patch(update::).delete(delete::), + ) +} + +#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Dummy)] +#[serde(rename_all = "camelCase")] +pub struct CreateRequest { + pub start_time: i64, + pub endpoint: String, + pub payload: Value, +} + +impl RequestExt for CreateRequest { + type Output = Task; + + fn from(&self) -> Option { + Some(Task { + id: Id::now(IdPrefix::Task), + start_time: Utc::now().timestamp_millis(), + end_time: None, + payload: self.payload.clone(), + endpoint: self.endpoint.clone(), + status: None, + metadata: RecordMetadata::default(), + }) + } + + fn get_store(stores: AppStores) -> entities::MongoStore { + stores.tasks + } + + fn access(&self, _: Arc) -> Option { + self.from() + } +} +impl HookExt for CreateRequest {} +impl PublicExt for CreateRequest {} diff --git a/api/src/router/secured_key.rs b/api/src/router/secured_key.rs index 56b01eb..84c504a 100644 --- a/api/src/router/secured_key.rs +++ b/api/src/router/secured_key.rs @@ -5,7 +5,7 @@ use crate::{ connection_model_schema::{ public_get_connection_model_schema, PublicGetConnectionModelSchema, }, - event_access, events, knowledge, metrics, oauth, passthrough, secrets, unified, + event_access, events, knowledge, metrics, oauth, passthrough, secrets, tasks, unified, vault_connection, }, middleware::{ @@ -38,6 +38,7 @@ pub async fn get_router(state: &Arc) -> Router> { .nest("/event-access", event_access::get_router()) .nest("/events", events::get_router()) .nest("/knowledge", knowledge::get_router()) + .nest("/tasks", tasks::get_router()) .nest("/metrics", metrics::get_router()) .nest("/oauth", oauth::get_router()) .nest("/passthrough", passthrough::get_router()) diff --git a/api/src/server.rs b/api/src/server.rs index d1c3720..460c7eb 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -24,6 +24,7 @@ use entities::{ page::PlatformPage, secret::Secret, secrets::SecretServiceProvider, + task::Task, user::UserClient, Connection, Event, GoogleKms, IOSKms, PlatformData, PublicConnection, SecretExt, Store, }; @@ -56,6 +57,7 @@ pub struct AppStores { pub knowledge: MongoStore, pub secrets: MongoStore, pub settings: MongoStore, + pub tasks: MongoStore, } #[derive(Clone)] @@ -83,8 +85,8 @@ pub struct Server { impl Server { pub async fn init(config: ConnectionsConfig) -> Result { - let client = Client::with_uri_str(&config.db_config.control_db_url).await?; - let db = client.database(&config.db_config.control_db_name); + let client = Client::with_uri_str(&config.db_config.event_db_url).await?; + let db = client.database(&config.db_config.event_db_name); let http_client = reqwest::ClientBuilder::new() .timeout(Duration::from_secs(config.http_client_timeout_secs)) @@ -112,6 +114,7 @@ impl Server { let knowledge = MongoStore::new(&db, &Store::ConnectionModelDefinitions).await?; let clients = MongoStore::new(&db, &Store::Clients).await?; let secrets_store = MongoStore::::new(&db, &Store::Secrets).await?; + let tasks = MongoStore::new(&db, &Store::Tasks).await?; let secrets_client: Arc = match config.secrets_config.provider { @@ -160,6 +163,7 @@ impl Server { knowledge, event, clients, + tasks, }; let event_access_cache = diff --git a/api/tests/http/crud.rs b/api/tests/http/crud.rs index d9a21d9..0ff2cd7 100644 --- a/api/tests/http/crud.rs +++ b/api/tests/http/crud.rs @@ -1,6 +1,7 @@ use crate::context::TestServer; -use api::logic::{common_model, ReadResponse}; +use api::logic::{common_model, tasks, ReadResponse}; use api::logic::{connection_definition, connection_model_definition, connection_model_schema}; +use entities::task::Task; use entities::{ common_model::CommonModel, connection_definition::ConnectionDefinition, connection_model_definition::ConnectionModelDefinition, diff --git a/entities/src/domain/event/mod.rs b/entities/src/domain/event/mod.rs index cbd0821..0fe71b1 100644 --- a/entities/src/domain/event/mod.rs +++ b/entities/src/domain/event/mod.rs @@ -2,6 +2,7 @@ pub mod emitted_events; pub mod event_access; pub mod event_state; pub mod hashes; +pub mod task; use self::{ event_state::EventState, diff --git a/entities/src/domain/event/task.rs b/entities/src/domain/event/task.rs new file mode 100644 index 0000000..8e3070a --- /dev/null +++ b/entities/src/domain/event/task.rs @@ -0,0 +1,19 @@ +use crate::{record_metadata::RecordMetadata, Id}; +use http::StatusCode; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct Task { + #[serde(rename = "_id")] + pub id: Id, + pub start_time: i64, + pub end_time: Option, + pub payload: Value, + pub endpoint: String, + #[serde(with = "http_serde_ext_ios::status_code::option")] + pub status: Option, + #[serde(flatten)] + pub metadata: RecordMetadata, +} diff --git a/entities/src/domain/id/prefix.rs b/entities/src/domain/id/prefix.rs index 426a739..9051680 100644 --- a/entities/src/domain/id/prefix.rs +++ b/entities/src/domain/id/prefix.rs @@ -18,7 +18,6 @@ pub enum IdPrefix { EventAccess, EventDependency, EventKey, - Idempotency, Job, JobStage, LLMMessage, @@ -30,13 +29,12 @@ pub enum IdPrefix { PipelineEvent, Platform, PlatformPage, - Queue, - ScheduledEvent, SessionId, Settings, Transaction, UnitTest, EarlyAccess, + Task, } impl Display for IdPrefix { @@ -56,7 +54,6 @@ impl Display for IdPrefix { IdPrefix::EventAccess => write!(f, "evt_ac"), IdPrefix::EventDependency => write!(f, "evt_dep"), IdPrefix::EventKey => write!(f, "evt_k"), - IdPrefix::Idempotency => write!(f, "idem"), IdPrefix::Job => write!(f, "job"), IdPrefix::JobStage => write!(f, "job_stg"), IdPrefix::LLMMessage => write!(f, "llm_msg"), @@ -68,13 +65,12 @@ impl Display for IdPrefix { IdPrefix::PipelineEvent => write!(f, "pipe_evt"), IdPrefix::Platform => write!(f, "plf"), IdPrefix::PlatformPage => write!(f, "plf_pg"), - IdPrefix::Queue => write!(f, "q"), - IdPrefix::ScheduledEvent => write!(f, "sched_evt"), IdPrefix::SessionId => write!(f, "session_id"), IdPrefix::Settings => write!(f, "st"), IdPrefix::Transaction => write!(f, "tx"), IdPrefix::UnitTest => write!(f, "ut"), IdPrefix::EarlyAccess => write!(f, "ea"), + IdPrefix::Task => write!(f, "task"), } } } @@ -98,7 +94,6 @@ impl TryFrom<&str> for IdPrefix { "evt_ac" => Ok(IdPrefix::EventAccess), "evt_dep" => Ok(IdPrefix::EventDependency), "evt_k" => Ok(IdPrefix::EventKey), - "idem" => Ok(IdPrefix::Idempotency), "job" => Ok(IdPrefix::Job), "job_stg" => Ok(IdPrefix::JobStage), "llm_msg" => Ok(IdPrefix::LLMMessage), @@ -110,13 +105,12 @@ impl TryFrom<&str> for IdPrefix { "pipe_evt" => Ok(IdPrefix::PipelineEvent), "plf" => Ok(IdPrefix::Platform), "plf_pg" => Ok(IdPrefix::PlatformPage), - "q" => Ok(IdPrefix::Queue), - "sched_evt" => Ok(IdPrefix::ScheduledEvent), "session_id" => Ok(IdPrefix::SessionId), "st" => Ok(IdPrefix::Settings), "tx" => Ok(IdPrefix::Transaction), "ut" => Ok(IdPrefix::UnitTest), "ea" => Ok(IdPrefix::EarlyAccess), + "task" => Ok(IdPrefix::Task), _ => Err(InternalError::invalid_argument( &format!("Invalid ID prefix: {}", s), None, @@ -142,7 +136,6 @@ impl From for String { IdPrefix::EventAccess => "evt_ac".to_string(), IdPrefix::EventDependency => "evt_dep".to_string(), IdPrefix::EventKey => "evt_k".to_string(), - IdPrefix::Idempotency => "idem".to_string(), IdPrefix::Job => "job".to_string(), IdPrefix::JobStage => "job_stg".to_string(), IdPrefix::LLMMessage => "llm_msg".to_string(), @@ -154,13 +147,12 @@ impl From for String { IdPrefix::PipelineEvent => "pipe_evt".to_string(), IdPrefix::Platform => "plf".to_string(), IdPrefix::PlatformPage => "plf_pg".to_string(), - IdPrefix::Queue => "q".to_string(), - IdPrefix::ScheduledEvent => "sched_evt".to_string(), IdPrefix::SessionId => "session_id".to_string(), IdPrefix::Settings => "st".to_string(), IdPrefix::Transaction => "tx".to_string(), IdPrefix::UnitTest => "ut".to_string(), IdPrefix::EarlyAccess => "ea".to_string(), + IdPrefix::Task => "task".to_string(), } } } @@ -216,7 +208,6 @@ mod test { assert_eq!(IdPrefix::try_from("arch").unwrap(), IdPrefix::Archive); assert_eq!(IdPrefix::try_from("evt_ac").unwrap(), IdPrefix::EventAccess); assert_eq!(IdPrefix::try_from("evt_k").unwrap(), IdPrefix::EventKey); - assert_eq!(IdPrefix::try_from("idem").unwrap(), IdPrefix::Idempotency); assert_eq!(IdPrefix::try_from("job").unwrap(), IdPrefix::Job); assert_eq!(IdPrefix::try_from("job_stg").unwrap(), IdPrefix::JobStage); assert_eq!(IdPrefix::try_from("llm_msg").unwrap(), IdPrefix::LLMMessage); @@ -229,15 +220,11 @@ mod test { IdPrefix::PipelineEvent ); assert_eq!(IdPrefix::try_from("plf").unwrap(), IdPrefix::Platform); - assert_eq!(IdPrefix::try_from("q").unwrap(), IdPrefix::Queue); - assert_eq!( - IdPrefix::try_from("sched_evt").unwrap(), - IdPrefix::ScheduledEvent - ); assert_eq!(IdPrefix::try_from("st").unwrap(), IdPrefix::Settings); assert_eq!(IdPrefix::try_from("tx").unwrap(), IdPrefix::Transaction); assert_eq!(IdPrefix::try_from("ut").unwrap(), IdPrefix::UnitTest); assert_eq!(IdPrefix::try_from("ea").unwrap(), IdPrefix::EarlyAccess); + assert_eq!(IdPrefix::try_from("task").unwrap(), IdPrefix::Task); } #[test] @@ -266,7 +253,6 @@ mod test { assert_eq!(format!("{}", IdPrefix::EventAccess), "evt_ac"); assert_eq!(format!("{}", IdPrefix::EventDependency), "evt_dep"); assert_eq!(format!("{}", IdPrefix::EventKey), "evt_k"); - assert_eq!(format!("{}", IdPrefix::Idempotency), "idem"); assert_eq!(format!("{}", IdPrefix::Job), "job"); assert_eq!(format!("{}", IdPrefix::JobStage), "job_stg"); assert_eq!(format!("{}", IdPrefix::LLMMessage), "llm_msg"); @@ -278,11 +264,10 @@ mod test { assert_eq!(format!("{}", IdPrefix::PipelineEvent), "pipe_evt"); assert_eq!(format!("{}", IdPrefix::Platform), "plf"); assert_eq!(format!("{}", IdPrefix::PlatformPage), "plf_pg"); - assert_eq!(format!("{}", IdPrefix::Queue), "q"); - assert_eq!(format!("{}", IdPrefix::ScheduledEvent), "sched_evt"); assert_eq!(format!("{}", IdPrefix::Settings), "st"); assert_eq!(format!("{}", IdPrefix::Transaction), "tx"); assert_eq!(format!("{}", IdPrefix::UnitTest), "ut"); assert_eq!(format!("{}", IdPrefix::EarlyAccess), "ea"); + assert_eq!(format!("{}", IdPrefix::Task), "task"); } } diff --git a/entities/src/domain/store/mod.rs b/entities/src/domain/store/mod.rs index 13e82f7..9df1745 100644 --- a/entities/src/domain/store/mod.rs +++ b/entities/src/domain/store/mod.rs @@ -43,12 +43,6 @@ generate_stores!( "microservices", PipelineEvents, "pipeline-events", - Idempotency, - "idempotency", - Deduplication, - "deduplication", - ScheduledEvents, - "scheduled-events", Events, "external-events", EventAccess, @@ -83,6 +77,8 @@ generate_stores!( "secrets", Settings, "settings", + Tasks, + "tasks", EmbedTokens, "embed-tokens", Sessions, diff --git a/watchdog/Cargo.toml b/watchdog/Cargo.toml index 4aa5973..ac7a3ed 100644 --- a/watchdog/Cargo.toml +++ b/watchdog/Cargo.toml @@ -12,6 +12,7 @@ envconfig.workspace = true futures.workspace = true cache = { path = "../cache" } entities = { path = "../entities" } +reqwest.workspace = true serde_json.workspace = true mongodb.workspace = true redis.workspace = true diff --git a/watchdog/src/client.rs b/watchdog/src/client.rs index a54c23c..eb9afc4 100644 --- a/watchdog/src/client.rs +++ b/watchdog/src/client.rs @@ -1,7 +1,12 @@ use crate::config::WatchdogConfig; +use bson::doc; use cache::remote::RedisCache; use chrono::Utc; -use entities::{cache::CacheConfig, database::DatabaseConfig, InternalError, PicaError}; +use entities::{ + cache::CacheConfig, database::DatabaseConfig, task::Task, Id, InternalError, MongoStore, + PicaError, Store, Unit, +}; +use futures::{stream::FuturesUnordered, StreamExt}; use redis::{AsyncCommands, RedisResult}; use std::fmt::Display; use std::time::Duration; @@ -11,6 +16,8 @@ pub struct WatchdogClient { watchdog: WatchdogConfig, cache: CacheConfig, database: DatabaseConfig, + client: reqwest::Client, + tasks: MongoStore, } impl Display for WatchdogClient { @@ -27,20 +34,35 @@ impl Display for WatchdogClient { } impl WatchdogClient { - pub fn new(watchdog: WatchdogConfig, cache: CacheConfig, database: DatabaseConfig) -> Self { - Self { + pub async fn new( + watchdog: WatchdogConfig, + cache: CacheConfig, + database: DatabaseConfig, + ) -> Result { + let http_client = reqwest::ClientBuilder::new() + .timeout(Duration::from_secs(watchdog.http_client_timeout_secs)) + .build()?; + let client = mongodb::Client::with_uri_str(&database.event_db_url).await?; + let db = client.database(&database.event_db_name); + + let tasks: MongoStore = MongoStore::new(&db, &Store::Tasks).await?; + + Ok(Self { watchdog, cache, database, - } + client: http_client, + tasks, + }) } - pub async fn start(self) -> Result<(), PicaError> { + pub async fn start(self) -> Result { self.run().await } - pub async fn run(self) -> Result<(), PicaError> { + async fn run(self) -> Result { info!("Starting watchdog"); + let cache = RedisCache::new(&self.cache).await.map_err(|e| { error!("Could not connect to cache: {e}"); InternalError::io_err(e.to_string().as_str(), None) @@ -65,6 +87,43 @@ impl WatchdogClient { loop { let _: RedisResult = async { redis_clone.del(key.clone()).await }.await; tracing::info!("Rate limiter cleared for {key} at {}", Utc::now()); + + let tasks: Vec = self + .tasks + .get_many( + Some(doc! { + "active": true, + "startTime": { + "$lte": Utc::now().timestamp_millis() + }}), + None, + None, + Some(self.watchdog.max_amount_of_tasks_to_process), + None, + ) + .await?; + + let client = self.client.clone(); + let tasks_store = self.tasks.clone(); + + tokio::spawn(async move { + let mut tasks = tasks + .into_iter() + .map(|task| execute(task, client.clone(), tasks_store.clone())) + .collect::>(); + + while let Some(result) = tasks.next().await { + match result { + Ok(id) => tracing::info!("Task {id} executed successfully"), + Err(e) => { + tracing::error!("Error executing task: {e}"); + } + } + } + }); + + tracing::info!("Executing next batch of tasks"); + tokio::time::sleep(Duration::from_secs( self.watchdog.rate_limiter_refresh_interval, )) @@ -72,3 +131,33 @@ impl WatchdogClient { } } } + +async fn execute( + task: Task, + http_client: reqwest::Client, + tasks_store: MongoStore, +) -> Result { + let request = http_client + .post(task.endpoint) + .json(&task.payload) + .send() + .await?; + + tasks_store + .collection + .find_one_and_update( + doc! { + "_id": task.id.to_string() // Filter by task ID + }, + doc! { + "$set": { // Use the $set operator correctly + "status": request.status().to_string(), + "endTime": Utc::now().timestamp_millis(), + "active": false + } + }, + ) + .await?; + + Ok(task.id) +} diff --git a/watchdog/src/config.rs b/watchdog/src/config.rs index 73e3682..997c3ff 100644 --- a/watchdog/src/config.rs +++ b/watchdog/src/config.rs @@ -6,6 +6,10 @@ use std::fmt::{Display, Formatter}; pub struct WatchdogConfig { #[envconfig(from = "RATE_LIMITER_REFRESH_INTERVAL", default = "60")] pub rate_limiter_refresh_interval: u64, + #[envconfig(from = "HTTP_CLIENT_TIMEOUT_SECS", default = "10")] + pub http_client_timeout_secs: u64, + #[envconfig(from = "MAX_AMOUNT_OF_TASKS_TO_PROCESS", default = "100")] + pub max_amount_of_tasks_to_process: u64, #[envconfig(nested = true)] pub redis: CacheConfig, #[envconfig(nested = true)] @@ -19,6 +23,11 @@ impl Display for WatchdogConfig { "RATE_LIMITER_REFRESH_INTERVAL: {}", self.rate_limiter_refresh_interval )?; + writeln!( + f, + "HTTP_CLIENT_TIMEOUT_SECS: {}", + self.http_client_timeout_secs + )?; writeln!(f, "{}", self.redis)?; writeln!(f, "{}", self.db) } diff --git a/watchdog/src/main.rs b/watchdog/src/main.rs index 7949c30..e5f865f 100644 --- a/watchdog/src/main.rs +++ b/watchdog/src/main.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { info!("Starting watchdog with config: {watchdog_config}{cache_config}{database_config}"); - let client = WatchdogClient::new(watchdog_config, cache_config, database_config); + let client = WatchdogClient::new(watchdog_config, cache_config, database_config).await?; client.start().await?;