From f0d56c7fcd0abcd213b3d57f88590ead411b5ab4 Mon Sep 17 00:00:00 2001 From: Samuel <39674930+sagojez@users.noreply.github.com> Date: Mon, 9 Dec 2024 14:59:35 +0000 Subject: [PATCH] feat: implement metrics for event emitter (#202) --- .github/workflows/tests.yaml | 2 +- Cargo.lock | 70 ++++- integrationos-api/src/logic/connection.rs | 2 + integrationos-api/src/logic/event_callback.rs | 7 +- integrationos-api/src/logic/oauth.rs | 2 + integrationos-api/tests/checker.rs | 5 +- integrationos-api/tests/context.rs | 22 +- integrationos-api/tests/http/callback.rs | 15 +- integrationos-api/tests/standard/mod.rs | 2 + integrationos-database/Cargo.toml | 1 + integrationos-database/src/main.rs | 21 +- integrationos-database/src/service/init.rs | 58 ++-- integrationos-database/tests/context.rs | 19 +- .../tests/http/connection.rs | 11 +- integrationos-database/tests/http/mod.rs | 1 + integrationos-database/tests/http/signal.rs | 48 +++ .../src/domain/connection/mod.rs | 11 + .../src/domain/event/emitted_events.rs | 27 +- integrationos-emit/Cargo.toml | 3 +- integrationos-emit/src/algebra/event.rs | 17 +- integrationos-emit/src/algebra/metrics.rs | 103 ++++++ integrationos-emit/src/algebra/mod.rs | 1 + integrationos-emit/src/domain/config.rs | 70 +++-- integrationos-emit/src/domain/event.rs | 13 +- integrationos-emit/src/main.rs | 10 +- integrationos-emit/src/router/emitter.rs | 31 ++ integrationos-emit/src/router/metrics.rs | 8 + integrationos-emit/src/router/mod.rs | 31 +- integrationos-emit/src/server.rs | 30 +- .../src/stream/fluvio_driver.rs | 297 +++++++++++------- integrationos-emit/tests/context.rs | 66 ++-- integrationos-emit/tests/http/emitter.rs | 4 +- integrationos-event/tests/mock_destination.rs | 2 + 33 files changed, 721 insertions(+), 289 deletions(-) create mode 100644 integrationos-database/tests/http/signal.rs create mode 100644 integrationos-emit/src/algebra/metrics.rs create mode 100644 integrationos-emit/src/router/emitter.rs create mode 100644 integrationos-emit/src/router/metrics.rs diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 76ce3d21..1375145b 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -57,7 +57,7 @@ jobs: - name: Install fluvio CLI run: curl -fsS https://hub.infinyon.cloud/install/install.sh | bash - name: Create fluvio topic - run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create -p 2 events && ~/.fluvio/bin/fluvio topic create -p 2 dlq + run: ~/.fluvio/bin/fluvio profile add docker 127.0.0.1:9103 docker && ~/.fluvio/bin/fluvio topic create events && ~/.fluvio/bin/fluvio topic create dlq - name: Install protoc run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - uses: dtolnay/rust-toolchain@stable diff --git a/Cargo.lock b/Cargo.lock index 6f5a2966..b7062251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -474,6 +474,27 @@ dependencies = [ "tower-http", ] +[[package]] +name = "axum-prometheus" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "739e2585f5376f5bdd129324ded72d3261fdd5b7c411a645920328fb5dc875d4" +dependencies = [ + "axum", + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.1", + "matchit 0.7.3", + "metrics 0.23.0", + "metrics-exporter-prometheus 0.15.3", + "once_cell", + "pin-project", + "tokio", + "tower 0.4.13", + "tower-http", +] + [[package]] name = "backoff" version = "0.4.0" @@ -3099,6 +3120,7 @@ dependencies = [ "integrationos-cache", "integrationos-domain", "integrationos-unified", + "mockito", "mongodb", "num_cpus", "rand", @@ -3188,6 +3210,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "axum-prometheus 0.7.0", "chrono", "dotenvy", "envconfig", @@ -3257,7 +3280,7 @@ dependencies = [ "anyhow", "async-trait", "axum", - "axum-prometheus", + "axum-prometheus 0.6.1", "dotenvy", "envconfig", "http 1.1.0", @@ -3874,6 +3897,16 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "metrics" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" +dependencies = [ + "ahash 0.8.11", + "portable-atomic", +] + [[package]] name = "metrics-exporter-prometheus" version = "0.12.2" @@ -3909,6 +3942,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "metrics-exporter-prometheus" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" +dependencies = [ + "base64 0.22.1", + "http-body-util", + "hyper 1.5.1", + "hyper-util", + "indexmap 2.6.0", + "ipnet", + "metrics 0.23.0", + "metrics-util 0.17.0", + "quanta 0.12.3", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "metrics-macros" version = "0.7.1" @@ -3950,6 +4003,21 @@ dependencies = [ "sketches-ddsketch", ] +[[package]] +name = "metrics-util" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.14.5", + "metrics 0.23.0", + "num_cpus", + "quanta 0.12.3", + "sketches-ddsketch", +] + [[package]] name = "miette" version = "7.2.0" diff --git a/integrationos-api/src/logic/connection.rs b/integrationos-api/src/logic/connection.rs index 94213dc8..260feeaf 100644 --- a/integrationos-api/src/logic/connection.rs +++ b/integrationos-api/src/logic/connection.rs @@ -321,6 +321,8 @@ pub async fn create_connection( group, identity: Some(identity.to_owned()), name: payload.name, + has_error: false, + error: None, identity_type: payload.identity_type, platform: connection_config.platform.into(), environment: event_access.environment, diff --git a/integrationos-api/src/logic/event_callback.rs b/integrationos-api/src/logic/event_callback.rs index 631dcae0..6681e26d 100644 --- a/integrationos-api/src/logic/event_callback.rs +++ b/integrationos-api/src/logic/event_callback.rs @@ -9,7 +9,9 @@ use axum::{ Json, Router, }; use bson::doc; -use integrationos_domain::{ApplicationError, Connection, Id, IntegrationOSError}; +use integrationos_domain::{ + emitted_events::ConnectionLostReason, ApplicationError, Connection, Id, IntegrationOSError, +}; use std::sync::Arc; pub fn get_router() -> Router> { @@ -19,10 +21,10 @@ pub fn get_router() -> Router> { ) } -// TODO: Write tests for this endpoint async fn database_connection_lost_callback( State(state): State>, Path(connection_id): Path, + Json(reason): Json, ) -> Result, IntegrationOSError> { // Instead of direcly updating we're getting the record first so that we can // modify the active and deprecated fields from the record metadata @@ -41,6 +43,7 @@ async fn database_connection_lost_callback( )), Some(mut conn) => { if conn.record_metadata.active { + conn.mark_error(reason.reason.as_str()); conn.record_metadata.mark_deprecated("system"); conn.record_metadata.mark_inactive("system"); conn.record_metadata.mark_updated("system"); diff --git a/integrationos-api/src/logic/oauth.rs b/integrationos-api/src/logic/oauth.rs index c573e2a8..158646bb 100644 --- a/integrationos-api/src/logic/oauth.rs +++ b/integrationos-api/src/logic/oauth.rs @@ -255,6 +255,8 @@ async fn oauth_handler( identity: Some(identity), identity_type: payload.identity_type, settings: conn_definition.settings, + has_error: false, + error: None, throughput: Throughput { key, limit: throughput, diff --git a/integrationos-api/tests/checker.rs b/integrationos-api/tests/checker.rs index 1658027b..31d4784f 100644 --- a/integrationos-api/tests/checker.rs +++ b/integrationos-api/tests/checker.rs @@ -4,7 +4,6 @@ use std::{ }; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::Value; pub enum CheckType { Json, @@ -87,10 +86,10 @@ impl JsonChecker for JsonCheckerImpl { file.read_to_string(&mut contents) .expect("Failed to read file contents"); - let expected = serde_json::from_str::(&contents) + let expected = serde_json::from_str::(&contents) .expect("Failed to deserialize expect value"); - let actual = serde_json::from_str::(&serialized) + let actual = serde_json::from_str::(&serialized) .expect("Failed to deserialize actual value"); expected == actual diff --git a/integrationos-api/tests/context.rs b/integrationos-api/tests/context.rs index 4701ce25..aadd38de 100644 --- a/integrationos-api/tests/context.rs +++ b/integrationos-api/tests/context.rs @@ -104,9 +104,9 @@ impl SecretExt for MockSecretsClient { } #[derive(Debug, Clone, Eq, PartialEq)] -pub struct ApiResponse { +pub struct ApiResponse { pub code: StatusCode, - pub data: T, + pub data: Data, } impl TestServer { @@ -268,13 +268,13 @@ impl TestServer { } } - pub async fn send_request( + pub async fn send_request( &self, path: &str, method: http::Method, key: Option<&str>, - payload: Option<&T>, - ) -> Result> { + payload: Option<&Payload>, + ) -> Result> { self.send_request_with_auth_headers( path, method, @@ -288,14 +288,14 @@ impl TestServer { .await } - pub async fn send_request_with_headers( + pub async fn send_request_with_headers( &self, path: &str, method: http::Method, key: Option<&str>, - payload: Option<&T>, + payload: Option<&Payload>, headers: Option>, - ) -> Result> { + ) -> Result> { let mut req = self .client .request(method, format!("http://localhost:{}/{path}", self.port)); @@ -319,14 +319,14 @@ impl TestServer { }) } - async fn send_request_with_auth_headers( + async fn send_request_with_auth_headers( &self, path: &str, method: http::Method, key: Option<&str>, - payload: Option<&T>, + payload: Option<&Payload>, headers: Option>, - ) -> Result> { + ) -> Result> { let headers = match headers { Some(h) => h .into_iter() diff --git a/integrationos-api/tests/http/callback.rs b/integrationos-api/tests/http/callback.rs index 8a5f6214..a70f2975 100644 --- a/integrationos-api/tests/http/callback.rs +++ b/integrationos-api/tests/http/callback.rs @@ -1,6 +1,9 @@ use crate::context::TestServer; use http::{Method, StatusCode}; -use integrationos_domain::{environment::Environment, prefix::IdPrefix, Connection, Id}; +use integrationos_domain::{ + emitted_events::ConnectionLostReason, environment::Environment, prefix::IdPrefix, Connection, + Id, +}; use serde_json::Value; #[tokio::test] @@ -13,9 +16,12 @@ async fn test_database_connection_lost_callback() { let connection_id = connection.id.to_string(); let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}"); + let reason = ConnectionLostReason { + reason: "database-connection-lost".to_string(), + }; let request = server - .send_request::(&path, Method::POST, None, None) + .send_request::(&path, Method::POST, None, Some(&reason)) .await .expect("Failed to send request"); @@ -31,9 +37,12 @@ async fn test_database_connection_lost_callback_404() { let connection_id = Id::now(IdPrefix::Connection).to_string(); let path = format!("v1/event-callbacks/database-connection-lost/{connection_id}"); + let reason = ConnectionLostReason { + reason: "database-connection-lost".to_string(), + }; let request = server - .send_request::(&path, Method::POST, None, None) + .send_request::(&path, Method::POST, None, Some(&reason)) .await .expect("Failed to send request"); diff --git a/integrationos-api/tests/standard/mod.rs b/integrationos-api/tests/standard/mod.rs index d70dd58e..00433219 100644 --- a/integrationos-api/tests/standard/mod.rs +++ b/integrationos-api/tests/standard/mod.rs @@ -196,6 +196,8 @@ fn test_json_connection() { key: "throughput-key".to_string(), limit: 100, }, + has_error: false, + error: None, ownership: Ownership { id: "owner-id".to_string().into(), client_id: "client-id".to_string(), diff --git a/integrationos-database/Cargo.toml b/integrationos-database/Cargo.toml index e8e2e3c0..ea7288b1 100644 --- a/integrationos-database/Cargo.toml +++ b/integrationos-database/Cargo.toml @@ -33,4 +33,5 @@ tracing-subscriber.workspace = true tracing.workspace = true [dev-dependencies] +mockito.workspace = true testcontainers-modules = { workspace = true, features = ["postgres"] } diff --git a/integrationos-database/src/main.rs b/integrationos-database/src/main.rs index 8d9328c3..33fe0e4d 100644 --- a/integrationos-database/src/main.rs +++ b/integrationos-database/src/main.rs @@ -26,23 +26,14 @@ fn main() -> Result<()> { .block_on(async move { match config.database_connection_type { DatabaseConnectionType::PostgreSql => { - match PostgresDatabaseConnection::init(&config).await { - Ok(server) => { - if let Err(e) = server.run().await { - PostgresDatabaseConnection::kill(&config).await?; - return Err(anyhow::anyhow!("Could not run server: {e}")); - } + let server = PostgresDatabaseConnection::init(&config).await?; - Ok(()) - } - Err(e) => { - tracing::error!("Could not initialize storage: {e}"); - - PostgresDatabaseConnection::kill(&config).await?; - - Err(anyhow::anyhow!("Could not initialize storage: {e}")) - } + if let Err(e) = server.run().await { + PostgresDatabaseConnection::kill(&config, e.to_string()).await?; + return Err(e); } + + Ok(()) } } }) diff --git a/integrationos-database/src/service/init.rs b/integrationos-database/src/service/init.rs index 249d32d7..55b5dfe6 100644 --- a/integrationos-database/src/service/init.rs +++ b/integrationos-database/src/service/init.rs @@ -4,49 +4,67 @@ use crate::{ server::{AppState, Server}, }; use axum::async_trait; -use integrationos_domain::{database::DatabaseConnectionConfig, Unit}; +use integrationos_domain::{ + database::DatabaseConnectionConfig, emitted_events::DatabaseConnectionLost, Id, Unit, +}; use reqwest::Client; -use serde_json::json; -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; #[async_trait] pub trait Initializer { async fn init(config: &DatabaseConnectionConfig) -> Result; - async fn kill(config: &DatabaseConnectionConfig) -> Result; + async fn kill(config: &DatabaseConnectionConfig, reason: String) + -> Result; } #[async_trait] impl Initializer for PostgresDatabaseConnection { async fn init(config: &DatabaseConnectionConfig) -> Result { - let postgres: PostgresDatabaseConnection = PostgresDatabaseConnection::new(config).await?; - let storage: Arc = Arc::new(postgres); - - Ok(Server { - state: Arc::new(AppState { - config: config.clone(), - storage, - }), - }) + let postgres = PostgresDatabaseConnection::new(config).await; + + match postgres { + Ok(postgres) => { + let storage: Arc = Arc::new(postgres); + + Ok(Server { + state: Arc::new(AppState { + config: config.clone(), + storage, + }), + }) + } + Err(e) => { + PostgresDatabaseConnection::kill(config, e.to_string()).await?; + Err(e) + } + } } - async fn kill(config: &DatabaseConnectionConfig) -> Result { + async fn kill( + config: &DatabaseConnectionConfig, + reason: String, + ) -> Result { let emit_url = config.emit_url.clone(); - let connection_id = config.connection_id.clone(); + let connection_id = Id::from_str(&config.connection_id)?; let client = Client::new(); - let value = json!({ - "type": "DatabaseConnectionLost", - "connectionId": connection_id - }); + let value = DatabaseConnectionLost { + connection_id, + reason: Some(reason), + schedule_on: None, + } + .as_event(); tracing::info!("Emitting event {value:?} to dispose of connection {connection_id}"); client .post(format!("{}/v1/emit", emit_url)) .header("content-type", "application/json") - .body(value.to_string()) + .json(&value) .send() .await?; + tracing::info!("Event for dispose of connection {connection_id} emitted"); + Ok(()) } } diff --git a/integrationos-database/tests/context.rs b/integrationos-database/tests/context.rs index 44756bec..e39e4d94 100644 --- a/integrationos-database/tests/context.rs +++ b/integrationos-database/tests/context.rs @@ -25,6 +25,7 @@ static TRACING: OnceLock<()> = OnceLock::new(); pub struct TestServer { pub port: u16, pub client: reqwest::Client, + // pub mock_server: ServerGuard, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -34,7 +35,7 @@ pub struct ApiResponse { } impl TestServer { - pub async fn new() -> Result { + pub async fn new(r#override: HashMap) -> Result { TRACING.get_or_init(|| { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -53,7 +54,7 @@ impl TestServer { .expect("Failed to get local address") .port(); - let config = DatabaseConnectionConfig::init_from_hashmap(&HashMap::from([ + let config_map: HashMap = HashMap::from([ ( "INTERNAL_SERVER_ADDRESS".to_string(), format!("0.0.0.0:{server_port}"), @@ -71,12 +72,15 @@ impl TestServer { ("POSTGRES_HOST".to_string(), "localhost".to_string()), ("POSTGRES_PORT".to_string(), port.to_string()), ("POSTGRES_NAME".to_string(), "postgres".to_string()), - ])) - .expect("Failed to initialize storage config"); + ]) + .into_iter() + .chain(r#override.into_iter()) + .collect::>(); - let server = PostgresDatabaseConnection::init(&config) - .await - .expect("Failed to initialize storage"); + let config = DatabaseConnectionConfig::init_from_hashmap(&config_map) + .expect("Failed to initialize storage config"); + + let server = PostgresDatabaseConnection::init(&config).await?; tokio::task::spawn(async move { server.run().await }); @@ -87,6 +91,7 @@ impl TestServer { Ok(Self { port: server_port, client, + // mock_server, }) } diff --git a/integrationos-database/tests/http/connection.rs b/integrationos-database/tests/http/connection.rs index ad3ed208..1d56a80c 100644 --- a/integrationos-database/tests/http/connection.rs +++ b/integrationos-database/tests/http/connection.rs @@ -1,11 +1,12 @@ use crate::context::TestServer; use http::{Method, StatusCode}; -use integrationos_domain::IntegrationOSError; +use integrationos_domain::{IntegrationOSError, Unit}; use serde_json::Value; +use std::collections::HashMap; #[tokio::test] -async fn test_execute_probe() -> Result<(), IntegrationOSError> { - let server = TestServer::new().await?; +async fn test_execute_probe() -> Result { + let server = TestServer::new(HashMap::new()).await?; let result = server .send_request::("database/probe", Method::GET, None) .await?; @@ -18,8 +19,8 @@ async fn test_execute_probe() -> Result<(), IntegrationOSError> { } #[tokio::test] -async fn test_execute_raw() -> Result<(), IntegrationOSError> { - let server = TestServer::new().await?; +async fn test_execute_raw() -> Result { + let server = TestServer::new(HashMap::new()).await?; let create_query = "CREATE TABLE IF NOT EXISTS users (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL);"; diff --git a/integrationos-database/tests/http/mod.rs b/integrationos-database/tests/http/mod.rs index b3b606b4..b7aae54d 100644 --- a/integrationos-database/tests/http/mod.rs +++ b/integrationos-database/tests/http/mod.rs @@ -1 +1,2 @@ pub mod connection; +pub mod signal; diff --git a/integrationos-database/tests/http/signal.rs b/integrationos-database/tests/http/signal.rs new file mode 100644 index 00000000..1fe050e9 --- /dev/null +++ b/integrationos-database/tests/http/signal.rs @@ -0,0 +1,48 @@ +use crate::context::TestServer; +use http::header::{ACCEPT, CONTENT_LENGTH, CONTENT_TYPE, HOST}; +use integrationos_domain::{ + emitted_events::DatabaseConnectionLost, prefix::IdPrefix, Id, IntegrationOSError, Unit, +}; +use mockito::Server as MockServer; +use std::collections::HashMap; + +#[tokio::test] +async fn test_kill_signal() -> Result { + let mut mock_server = MockServer::new_async().await; + let mock_uri = mock_server.url(); + + let connection_id = Id::now(IdPrefix::Connection); + + let path = "/v1/emit"; + let body = DatabaseConnectionLost { + connection_id, + reason: Some( + "error returned from database: password authentication failed for user \"postgres\"" + .to_string(), + ), + schedule_on: None, + } + .as_event(); + + let mock_server = mock_server + .mock("POST", path) + .match_header(CONTENT_TYPE, "application/json") + .match_header(ACCEPT, "*/*") + .match_header(HOST, mock_server.host_with_port().as_str()) + .match_header(CONTENT_LENGTH, body.to_string().len().to_string().as_str()) + .match_body(&*body.to_string()) + .with_status(200) + .create_async() + .await; + + let _ = TestServer::new(HashMap::from([ + ("CONNECTION_ID".to_string(), connection_id.to_string()), + ("POSTGRES_PASSWORD".to_string(), "wrongpass".to_string()), + ("EMIT_URL".to_string(), mock_uri), + ])) + .await; + + mock_server.expect(1).assert_async().await; + + Ok(()) +} diff --git a/integrationos-domain/src/domain/connection/mod.rs b/integrationos-domain/src/domain/connection/mod.rs index ac61d267..3717cf60 100644 --- a/integrationos-domain/src/domain/connection/mod.rs +++ b/integrationos-domain/src/domain/connection/mod.rs @@ -42,10 +42,21 @@ pub struct Connection { pub ownership: Ownership, #[serde(default)] pub oauth: Option, + #[serde(default)] + pub has_error: bool, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub error: Option, #[serde(flatten, default)] pub record_metadata: RecordMetadata, } +impl Connection { + pub fn mark_error(&mut self, error: &str) { + self.has_error = true; + self.error = Some(error.to_string()); + } +} + #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] #[cfg_attr(feature = "dummy", derive(fake::Dummy))] diff --git a/integrationos-domain/src/domain/event/emitted_events.rs b/integrationos-domain/src/domain/event/emitted_events.rs index 4b7af55d..c8efdd3b 100644 --- a/integrationos-domain/src/domain/event/emitted_events.rs +++ b/integrationos-domain/src/domain/event/emitted_events.rs @@ -1,10 +1,27 @@ -use crate::{ownership::Ownership, Id}; +use crate::Id; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; -pub struct RefreshTokenExpired { +#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DatabaseConnectionLost { pub connection_id: Id, - pub buildable_id: Ownership, + pub reason: Option, + pub schedule_on: Option, } -pub enum EmittedEvents { - RefreshTokenExpired(RefreshTokenExpired), +impl DatabaseConnectionLost { + pub fn as_event(&self) -> Value { + json!({ + "type": "DatabaseConnectionLost", + "connectionId": self.connection_id, + "reason": self.reason.clone(), + "scheduleOn": self.schedule_on, + }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ConnectionLostReason { + pub reason: String, } diff --git a/integrationos-emit/Cargo.toml b/integrationos-emit/Cargo.toml index 327c635a..e87a01b1 100644 --- a/integrationos-emit/Cargo.toml +++ b/integrationos-emit/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" anyhow.workspace = true async-trait.workspace = true axum.workspace = true +axum-prometheus = "0.7" chrono.workspace = true dotenvy.workspace = true envconfig.workspace = true @@ -19,7 +20,7 @@ integrationos-domain = { path = "../integrationos-domain" } mongodb.workspace = true num_cpus.workspace = true reqwest = { workspace = true, features = ["rustls-tls"] } -reqwest-middleware = "0.4" +reqwest-middleware = { version = "0.4", features = ["json"] } reqwest-retry = "0.7" reqwest-tracing = "0.5.4" serde = { workspace = true , features = ["derive"] } diff --git a/integrationos-emit/src/algebra/event.rs b/integrationos-emit/src/algebra/event.rs index 005a3b77..3f7073d5 100644 --- a/integrationos-emit/src/algebra/event.rs +++ b/integrationos-emit/src/algebra/event.rs @@ -1,7 +1,10 @@ use crate::{domain::event::Event, server::AppState}; use async_trait::async_trait; use http::header::AUTHORIZATION; -use integrationos_domain::{ApplicationError, Claims, Id, IntegrationOSError, InternalError, Unit}; +use integrationos_domain::{ + emitted_events::{ConnectionLostReason, DatabaseConnectionLost}, + ApplicationError, Claims, Id, IntegrationOSError, InternalError, Unit, +}; #[async_trait] pub trait EventExt { @@ -12,14 +15,24 @@ pub trait EventExt { impl EventExt for Event { async fn side_effect(&self, ctx: &AppState, entity_id: Id) -> Result { match self { - Event::DatabaseConnectionLost { connection_id, .. } => { + Event::DatabaseConnectionLost(DatabaseConnectionLost { + connection_id, + reason, + .. + }) => { let base_path = &ctx.config.event_callback_url; let path = format!("{base_path}/database-connection-lost/{connection_id}"); let authorization = Claims::from_secret(ctx.config.jwt_secret.as_str())?; + let reason = reason + .clone() + .unwrap_or_else(|| "Unable to connect to database".to_string()); + + let payload = ConnectionLostReason { reason }; ctx.http_client .post(path) + .json(&payload) .header(AUTHORIZATION, format!("Bearer {authorization}")) .send() .await diff --git a/integrationos-emit/src/algebra/metrics.rs b/integrationos-emit/src/algebra/metrics.rs new file mode 100644 index 00000000..7056346b --- /dev/null +++ b/integrationos-emit/src/algebra/metrics.rs @@ -0,0 +1,103 @@ +use axum_prometheus::metrics::{counter, histogram, Counter, Histogram}; +use axum_prometheus::{ + metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}, + utils::SECONDS_DURATION_BUCKETS, + GenericMetricLayer, Handle, PrometheusMetricLayerBuilder, AXUM_HTTP_REQUESTS_DURATION_SECONDS, +}; +use integrationos_domain::Unit; +use std::time::Duration; + +pub const EVENT_DURATION_KEY: &str = "event_duration_seconds"; +pub const EVENT_COUNT_KEY: &str = "event_count"; +pub const EVENT_ERRORS_KEY: &str = "event_errors"; +pub const EVENT_SUCCESS_KEY: &str = "event_success"; + +pub const DLQ_DURATION_KEY: &str = "dlq_duration_seconds"; +pub const DLQ_COUNT_KEY: &str = "dlq_count"; +pub const DLQ_ERRORS_KEY: &str = "dlq_errors"; +pub const DLQ_SUCCESS_KEY: &str = "dlq_success"; + +pub type MetricHandle = ( + GenericMetricLayer<'static, PrometheusHandle, Handle>, + PrometheusHandle, +); + +pub trait MetricExt { + fn succeeded(&self, value: u64) -> Unit; + fn errored(&self, value: u64) -> Unit; + fn duration(&self, value: Duration) -> Unit; +} + +pub struct MetricsRegistry { + pub event_count: Counter, + pub event_errors: Counter, + pub event_success: Counter, + pub event_duration: Histogram, +} + +impl MetricsRegistry { + pub fn noop() -> Self { + Self { + event_count: Counter::noop(), + event_errors: Counter::noop(), + event_success: Counter::noop(), + event_duration: Histogram::noop(), + } + } + + pub fn handle() -> MetricHandle { + PrometheusMetricLayerBuilder::new() + .with_metrics_from_fn(|| { + PrometheusBuilder::new() + .set_buckets_for_metric( + Matcher::Full(AXUM_HTTP_REQUESTS_DURATION_SECONDS.to_string()), + SECONDS_DURATION_BUCKETS, + ) + .expect("Unable to install request matcher") + .set_buckets_for_metric( + Matcher::Full(EVENT_DURATION_KEY.to_string()), + SECONDS_DURATION_BUCKETS, + ) + .expect("Unable to install event recorder matcher") + .set_buckets_for_metric( + Matcher::Full(DLQ_DURATION_KEY.to_string()), + SECONDS_DURATION_BUCKETS, + ) + .expect("Unable to install dlq recorder matcher") + .install_recorder() + .expect("Unable to setup metrics") + }) + .with_ignore_pattern("/metrics") + .build_pair() + } +} + +impl Default for MetricsRegistry { + fn default() -> Self { + Self { + event_count: counter!(EVENT_COUNT_KEY, "events" => "count"), + event_errors: counter!(EVENT_ERRORS_KEY, "events" => "errors"), + event_success: counter!(EVENT_SUCCESS_KEY, "events" => "success"), + event_duration: histogram!(EVENT_DURATION_KEY, "events" => "duration"), + // dlq_count: counter!(DLQ_COUNT_KEY, "dlq" => "count"), + // dlq_errors: counter!(DLQ_ERRORS_KEY, "dlq" => "errors"), + // dlq_success: counter!(DLQ_SUCCESS_KEY, "dlq" => "success"), + } + } +} + +impl MetricExt for MetricsRegistry { + fn succeeded(&self, value: u64) -> Unit { + self.event_success.increment(value); + self.event_count.increment(value); + } + + fn errored(&self, value: u64) -> Unit { + self.event_errors.increment(value); + self.event_count.increment(value); + } + + fn duration(&self, value: Duration) -> Unit { + self.event_duration.record(value); + } +} diff --git a/integrationos-emit/src/algebra/mod.rs b/integrationos-emit/src/algebra/mod.rs index 53f11265..43c955ac 100644 --- a/integrationos-emit/src/algebra/mod.rs +++ b/integrationos-emit/src/algebra/mod.rs @@ -1 +1,2 @@ pub mod event; +pub mod metrics; diff --git a/integrationos-emit/src/domain/config.rs b/integrationos-emit/src/domain/config.rs index b23de8d3..11194fdd 100644 --- a/integrationos-emit/src/domain/config.rs +++ b/integrationos-emit/src/domain/config.rs @@ -2,8 +2,8 @@ use crate::stream::EventStreamProvider; use envconfig::Envconfig; use fluvio::dataplane::types::PartitionId; use integrationos_domain::{ - cache::CacheConfig, - {database::DatabaseConfig, environment::Environment}, + cache::CacheConfig, database::DatabaseConfig, environment::Environment, IntegrationOSError, + InternalError, }; use std::{ fmt::{Display, Formatter}, @@ -19,6 +19,8 @@ pub struct EmitterConfig { pub worker_threads: Option, #[envconfig(from = "INTERNAL_SERVER_ADDRESS", default = "0.0.0.0:3001")] pub address: SocketAddr, + #[envconfig(from = "METRICS_SERVER_ADDRESS", default = "0.0.0.0:9004")] + pub metrics_address: SocketAddr, #[envconfig(from = "CACHE_SIZE", default = "10000")] pub cache_size: u64, #[envconfig(from = "ENVIRONMENT", default = "development")] @@ -52,8 +54,10 @@ pub struct EmitterConfig { default = "2thZ2UiOnsibmFtZSI6IlN0YXJ0dXBsa3NoamRma3NqZGhma3NqZGhma3NqZG5jhYtggfaP9ubmVjdGlvbnMiOjUwMDAwMCwibW9kdWxlcyI6NSwiZW5kcG9pbnRzIjo3b4e05e2-f050-401f-9822-44f43f71753c" )] pub jwt_secret: String, - #[envconfig(from = "STATEFUL_SET_POD_NAME")] - pub stateful_set_pod_name: Option, + #[envconfig(from = "STATEFULSET_POD_NAME")] + pub statefulset_pod_name: String, + #[envconfig(from = "PARTITION_COUNT")] + pub partition_count: u32, #[envconfig( from = "EVENT_CALLBACK_URL", default = "http://localhost:3005/v1/event-callbacks" @@ -68,23 +72,52 @@ pub struct EmitterConfig { } impl EmitterConfig { - /// Returns the partition id to consume from, beware that this assumes several things: - /// 1. The pod name is in the format of `topic-partition-id` (for example in a statefulset) - /// 2. Each pod will now have a 1-1 mapping to a partition - /// 3. It'll read the same partition for the DLQ and the main topic, which means that the DLQ - /// and main topic will have the same amount of partitions. + /// Determines the partition ID that this pod should consume from. /// - /// ## Warning - /// This is a very brittle assumption, and should be revisited if we ever have a more complex - /// setup or until this gets resolved: https://github.com/infinyon/fluvio/issues/760 - pub fn partition(&self) -> Option { - let pod_name = self.stateful_set_pod_name.as_ref()?; + /// This method relies on the pod's name to derive the partition ID. It assumes the following: + /// + /// 1. **Pod Naming Convention**: The pod name must follow the format `topic-partition-id`, + /// such as `example-topic-0` or `example-topic-1`. This is typical for StatefulSet pods. + /// 2. **1:1 Mapping**: Each pod has a one-to-one mapping with a specific partition. + /// 3. **Consistent Partition Counts**: The number of partitions in the main topic and + /// any associated Dead Letter Queue (DLQ) must be the same. This ensures consistent + /// partition-to-pod assignments for both topics. + /// + /// ### Behavior + /// - Extracts the partition ID from the pod name using the last hyphen-delimited segment. + /// - Computes the resulting partition by taking the modulus of the extracted ID with the + /// total number of partitions (`partition_count`). + /// + /// ### Assumptions & Limitations + /// - The method is **fragile** and depends on strict adherence to the pod naming convention. + /// - This approach will break in more complex setups or if pod naming conventions change. + /// - Any modifications to the partitioning or naming logic should revisit this method. + /// + /// ### Error Handling + /// Returns an error in the following cases: + /// - The pod name does not match the expected format. + /// - The partition ID cannot be parsed as a valid integer. + /// + /// ### Related Issue + /// For future improvements and a more robust solution, see: + /// [Fluvio Issue #760](https://github.com/infinyon/fluvio/issues/760) + pub fn partition(&self) -> Result { + let pod_name = self.statefulset_pod_name.clone(); + let partition_count = self.partition_count; if let Some((_, partition_id)) = pod_name.rsplit_once('-') { - let partition_id = PartitionId::from_str(partition_id).ok()?; - Some(partition_id) + let partition_id = PartitionId::from_str(partition_id).ok().ok_or({ + InternalError::configuration_error( + &format!("Could not parse partition from pod name: {}", pod_name), + None, + ) + })?; + Ok(partition_id % partition_count) } else { - None + Err(InternalError::configuration_error( + &format!("Could not parse partition from pod name: {}", pod_name), + None, + )) } } } @@ -92,6 +125,7 @@ impl EmitterConfig { impl Display for EmitterConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { writeln!(f, "SERVER_ADDRESS: {}", self.address)?; + writeln!(f, "METRICS_SERVER_ADDRESS: {}", self.metrics_address)?; writeln!(f, "CACHE_SIZE: {}", self.cache_size)?; writeln!(f, "SECRET: ****")?; writeln!(f, "ENVIRONMENT: {}", self.environment)?; @@ -121,7 +155,7 @@ impl Display for EmitterConfig { "PUSHER_SLEEP_DURATION_IN_MILLIS: {}", self.pusher_sleep_duration_millis )?; - writeln!(f, "STATEFUL_SET_POD_NAME: {:?}", self.stateful_set_pod_name)?; + writeln!(f, "STATEFUL_SET_POD_NAME: {:?}", self.statefulset_pod_name)?; writeln!(f, "PUSHER_MAX_CHUNK_SIZE: {}", self.pusher_max_chunk_size)?; writeln!(f, "JWT_SECRET: ****")?; writeln!(f, "EVENT_CALLBACK_URL: {}", self.event_callback_url)?; diff --git a/integrationos-emit/src/domain/event.rs b/integrationos-emit/src/domain/event.rs index 7f57755f..7585473d 100644 --- a/integrationos-emit/src/domain/event.rs +++ b/integrationos-emit/src/domain/event.rs @@ -1,7 +1,8 @@ use crate::{algebra::event::EventExt, server::AppState}; use chrono::Utc; use integrationos_domain::{ - prefix::IdPrefix, record_metadata::RecordMetadata, Id, IntegrationOSError, Unit, + emitted_events::DatabaseConnectionLost, prefix::IdPrefix, record_metadata::RecordMetadata, Id, + IntegrationOSError, Unit, }; use serde::{Deserialize, Serialize}; use strum::{AsRefStr, EnumString}; @@ -9,11 +10,7 @@ use strum::{AsRefStr, EnumString}; #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "PascalCase", tag = "type")] pub enum Event { - #[serde(rename_all = "camelCase")] - DatabaseConnectionLost { - connection_id: Id, - schedule_on: Option, - }, + DatabaseConnectionLost(DatabaseConnectionLost), } impl Event { @@ -28,7 +25,9 @@ impl Event { pub fn scheduled_on(&self) -> Option { match self { - Event::DatabaseConnectionLost { schedule_on, .. } => *schedule_on, + Event::DatabaseConnectionLost(DatabaseConnectionLost { schedule_on, .. }) => { + *schedule_on + } } } } diff --git a/integrationos-emit/src/main.rs b/integrationos-emit/src/main.rs index dd325029..78b27030 100644 --- a/integrationos-emit/src/main.rs +++ b/integrationos-emit/src/main.rs @@ -5,8 +5,10 @@ use integrationos_domain::{ telemetry::{get_subscriber, init_subscriber}, Unit, }; -use integrationos_emit::{domain::config::EmitterConfig, server::Server}; -use std::time::Duration; +use integrationos_emit::{ + algebra::metrics::MetricsRegistry, domain::config::EmitterConfig, server::Server, +}; +use std::{sync::Arc, time::Duration}; use tokio_graceful_shutdown::{SubsystemHandle, Toplevel}; fn main() -> Result { @@ -24,7 +26,9 @@ fn main() -> Result { .build()? .block_on(async move { Toplevel::new(|subsys: SubsystemHandle| async move { - let server = Server::init(config.clone()) + let metrics = Arc::new(MetricsRegistry::handle()); + + let server = Server::init(config.clone(), &metrics) .await .expect("Failed to initialize server"); diff --git a/integrationos-emit/src/router/emitter.rs b/integrationos-emit/src/router/emitter.rs new file mode 100644 index 00000000..123c0344 --- /dev/null +++ b/integrationos-emit/src/router/emitter.rs @@ -0,0 +1,31 @@ +use crate::{logic::emitter, server::AppState}; +use axum::{middleware::from_fn, response::IntoResponse, routing::get, Json, Router}; +use http::StatusCode; +use integrationos_domain::telemetry::log_request_middleware; +use serde_json::json; +use std::sync::Arc; +use tower_http::{cors::CorsLayer, trace::TraceLayer}; + +pub async fn get_router(state: &Arc) -> Router> { + let path = format!("/{}", state.config.api_version); + let metrics_layer = state.metrics.as_ref().0.clone(); + Router::new() + .nest(&path, emitter::get_router()) + .route("/", get(get_root)) + .fallback(not_found_handler) + .layer(CorsLayer::permissive()) + .layer(from_fn(log_request_middleware)) + .layer(metrics_layer) + .layer(TraceLayer::new_for_http()) +} + +pub async fn get_root() -> impl IntoResponse { + Json(json!({ "success": true })) +} + +pub async fn not_found_handler() -> impl IntoResponse { + ( + StatusCode::NOT_FOUND, + Json(json!({ "error": "Not found", })), + ) +} diff --git a/integrationos-emit/src/router/metrics.rs b/integrationos-emit/src/router/metrics.rs new file mode 100644 index 00000000..6c4448a3 --- /dev/null +++ b/integrationos-emit/src/router/metrics.rs @@ -0,0 +1,8 @@ +use crate::server::AppState; +use axum::{routing::get, Router}; +use std::{future::ready, sync::Arc}; + +pub async fn get_router(state: &Arc) -> Router> { + let metrics_handle = state.metrics.as_ref().1.clone(); + Router::new().route("/metrics", get(move || ready(metrics_handle.render()))) +} diff --git a/integrationos-emit/src/router/mod.rs b/integrationos-emit/src/router/mod.rs index 020b1aad..d5129faa 100644 --- a/integrationos-emit/src/router/mod.rs +++ b/integrationos-emit/src/router/mod.rs @@ -1,29 +1,2 @@ -use crate::{logic::emitter, server::AppState}; -use axum::{middleware::from_fn, response::IntoResponse, routing::get, Json, Router}; -use http::StatusCode; -use integrationos_domain::telemetry::log_request_middleware; -use serde_json::json; -use std::sync::Arc; -use tower_http::{cors::CorsLayer, trace::TraceLayer}; - -pub async fn get_router(state: &Arc) -> Router> { - let path = format!("/{}", state.config.api_version); - Router::new() - .nest(&path, emitter::get_router()) - .route("/", get(get_root)) - .fallback(not_found_handler) - .layer(CorsLayer::permissive()) - .layer(from_fn(log_request_middleware)) - .layer(TraceLayer::new_for_http()) -} - -pub async fn get_root() -> impl IntoResponse { - Json(json!({ "success": true })) -} - -pub async fn not_found_handler() -> impl IntoResponse { - ( - StatusCode::NOT_FOUND, - Json(json!({ "error": "Not found", })), - ) -} +pub mod emitter; +pub mod metrics; diff --git a/integrationos-emit/src/server.rs b/integrationos-emit/src/server.rs index c6f7cdc2..9400339d 100644 --- a/integrationos-emit/src/server.rs +++ b/integrationos-emit/src/server.rs @@ -1,4 +1,5 @@ use crate::{ + algebra::metrics::MetricHandle, domain::{ config::EmitterConfig, deduplication::Deduplication, @@ -35,6 +36,7 @@ pub struct AppState { pub config: EmitterConfig, pub app_stores: AppStores, pub http_client: ClientWithMiddleware, + pub metrics: Arc, pub event_stream: Arc, } @@ -47,7 +49,7 @@ pub struct Server { } impl Server { - pub async fn init(config: EmitterConfig) -> AnyhowResult { + pub async fn init(config: EmitterConfig, metrics: &Arc) -> AnyhowResult { let client = Client::with_uri_str(&config.db_config.event_db_url).await?; let database = client.database(&config.db_config.event_db_name); @@ -94,6 +96,7 @@ impl Server { let state = Arc::new(AppState { config: config.clone(), app_stores, + metrics: metrics.clone(), http_client, event_stream: Arc::clone(&event_stream), }); @@ -107,18 +110,29 @@ impl Server { } pub async fn run(&self, subsys: SubsystemHandle) -> AnyhowResult { - let app = router::get_router(&self.state).await; + let server_app = router::emitter::get_router(&self.state).await; + let metrics_app = router::metrics::get_router(&self.state).await; - let app: Router = app.with_state(self.state.clone()); + let server_app: Router = server_app.with_state(self.state.clone()); + let metrics_app: Router = metrics_app.with_state(self.state.clone()); tracing::info!("Emitter server listening on {}", self.state.config.address); + tracing::info!( + "Metrics server listening on {}", + self.state.config.metrics_address + ); - let tcp_listener = TcpListener::bind(&self.state.config.address).await?; + let server_tcp_listener = TcpListener::bind(&self.state.config.address).await?; + let metrics_tcp_listener = TcpListener::bind(&self.state.config.metrics_address).await?; - axum::serve(tcp_listener, app) - .with_graceful_shutdown(Self::shutdown(subsys)) - .await - .map_err(|e| anyhow::anyhow!("Server error: {}", e)) + let server_handle = axum::serve(server_tcp_listener, server_app) + .with_graceful_shutdown(Self::shutdown(subsys)); + + let metrics_handle = axum::serve(metrics_tcp_listener, metrics_app); + + let (_, _) = tokio::join!(server_handle, metrics_handle); + + Ok(()) } async fn shutdown(subsys: SubsystemHandle) { diff --git a/integrationos-emit/src/stream/fluvio_driver.rs b/integrationos-emit/src/stream/fluvio_driver.rs index 969769a6..42e6c34d 100644 --- a/integrationos-emit/src/stream/fluvio_driver.rs +++ b/integrationos-emit/src/stream/fluvio_driver.rs @@ -1,5 +1,6 @@ use super::{EventStreamExt, EventStreamTopic}; use crate::{ + algebra::metrics::{MetricExt, MetricsRegistry}, domain::{ config::{EmitterConfig, EventStreamConfig}, deduplication::Deduplication, @@ -44,6 +45,7 @@ pub struct FluvioDriverImpl { pub dlq_consumer: ConsumerConfig, pub tgt_producer: TargetProducer, pub dlq_producer: DlqProducer, + metrics: MetricsRegistry, } impl FluvioDriverImpl { @@ -110,14 +112,9 @@ impl FluvioDriverImpl { ) })?; - let mut ext = ConsumerConfigExtBuilder::default(); - - if let Some(partition) = config.partition() { - ext = ext.partition(partition).to_owned(); - } - - let ext = ext + let ext = ConsumerConfigExtBuilder::default() .topic(consumer_topic) + .partition(config.partition()?) .offset_start(offset) .offset_consumer(consumer_id) .offset_strategy(OffsetManagementStrategy::Manual) @@ -148,14 +145,9 @@ impl FluvioDriverImpl { let consumer_id = format!("{consumer_id}-dlq"); - let mut ext = ConsumerConfigExtBuilder::default(); - - if let Some(partition) = config.partition() { - ext = ext.partition(partition).to_owned(); - } - - let ext = ext + let ext = ConsumerConfigExtBuilder::default() .topic(&topic) + .partition(config.partition()?) .offset_start(Offset::beginning()) .offset_consumer(consumer_id) .offset_strategy(OffsetManagementStrategy::Manual) @@ -174,6 +166,7 @@ impl FluvioDriverImpl { dlq_consumer, tgt_producer, dlq_producer, + metrics: MetricsRegistry::default(), }) } @@ -258,13 +251,26 @@ impl FluvioDriverImpl { #[async_trait] impl EventStreamExt for FluvioDriverImpl { - /** - * Publishes an event to the specified topic - * @param event - The event to publish - * @param target - The target topic of the event - * - * It serializes the event using serde_json and sends it to the specified topic. - */ + /// Publishes an event to the specified topic. + /// + /// # Parameters + /// - `event`: The event to publish, containing its metadata, payload, and associated data. + /// - `target`: The target topic to which the event should be published, either `Target` or `Dlq`. + /// + /// # Behavior + /// This method performs the following steps: + /// 1. Serializes the event into a binary payload using `serde_json`. + /// 2. Sends the serialized payload to the specified topic (`Target` or `DLQ`) using the appropriate producer. + /// + /// The method ensures proper error handling for serialization and publishing, logging relevant errors and returning an appropriate result. + /// + /// # Returns + /// - `Ok(Id)`: The `entity_id` of the published event, indicating successful publication. + /// - `Err(IntegrationOSError)`: If an error occurs during serialization or while sending the event to the target. + /// + /// # Errors + /// - **Serialization Error**: If the event cannot be serialized into a JSON payload. + /// - **Publishing Error**: If the Fluvio producer fails to send the event to the target topic. async fn publish( &self, event: EventEntity, @@ -296,15 +302,26 @@ impl EventStreamExt for FluvioDriverImpl { Ok(event.entity_id) } - /** - * Consumes events from the specified topic - * @param target - The target topic of the event - * @param subsys - The subsystem handle - * @param ctx - The application state - * - * It consumes events from the specified topic using the consumer stream. - * It processes each event and updates the event outcome in the events collection. - */ + /// Consumes events from the specified topic and processes them. + /// + /// # Parameters + /// - `target`: The target event stream topic to consume from, either the main target or the dead-letter queue (DLQ). + /// - `subsys`: A handle to the subsystem used for inter-process communication or coordination. + /// - `ctx`: A reference to the application state, providing access to shared resources and configurations. + /// + /// # Behavior + /// This method creates a consumer stream for the specified topic using the appropriate consumer configuration. + /// It processes each event from the stream and updates the event outcome in the events collection. The processing + /// logic is delegated to the `consume_topic` method, which handles event-specific tasks. + /// + /// # Returns + /// - `Ok(Unit)`: If the events are consumed and processed successfully. + /// - `Err(IntegrationOSError)`: If an error occurs during stream consumption or processing. + /// + /// # Errors + /// This method returns an error if: + /// - There is an issue initializing the consumer stream. + /// - An error occurs while processing events in the topic. async fn consume( &self, target: EventStreamTopic, @@ -321,111 +338,109 @@ impl EventStreamExt for FluvioDriverImpl { .consumer_with_config(consumer.ext.clone()) .await?; - // match self.consume_topic(target, &subsys, ctx, consumer, &mut stream) .await } - /** - * Processes an event from the consumer stream - * @param ctx - The application state - * @param target - The target topic of the event - * @param event - The event to process - * - * It first checks if the event is already processed, if so, it returns without processing it. - * If the event is not processed, it executes the side effect and updates the event outcome. - * - * Finally, it updates the event outcome in the events collection if the side effect was executed at least once. - */ + /// Processes an individual event from the consumer stream. + /// + /// # Parameters + /// - `ctx`: A reference to the application state, which provides access to shared resources, configurations, and storage. + /// - `target`: The event stream topic that the event belongs to, either `Target` or `Dlq`. + /// - `event`: The event to be processed, containing its metadata, status, and logic for side effects. + /// + /// # Behavior + /// This method performs the following steps: + /// 1. **Deduplication Check**: Verifies if the event has already been processed by checking the deduplication store. If so, the method returns early. + /// 2. **Deduplication Record Creation**: If the event is not processed, it creates a deduplication record to prevent re-processing. + /// 3. **Event Processing**: + /// - Executes the event's side effect logic. + /// - Updates the event's outcome in the events store based on the success or failure of the side effect. + /// 4. **Error Handling**: + /// - If processing fails, the deduplication record is removed, and the event is published to the DLQ with updated retry metadata. + /// - If the event is in the DLQ and has exceeded the maximum allowed retries, it marks the event as permanently failed. + /// + /// The method distinguishes between events in the main `Target` topic and the `DLQ` (Dead Letter Queue), handling them differently based on their context and retry state. + /// + /// # Returns + /// - `Ok(Unit)`: If the event is successfully processed or deemed complete (even if moved to the DLQ). + /// - `Err(IntegrationOSError)`: If a critical error occurs during processing or storage operations. + /// + /// # Errors + /// - Returns an error if there are issues interacting with the deduplication store or the events store. + /// - Errors may also occur if publishing to the DLQ or executing side effects fails critically. async fn process( &self, ctx: &AppState, target: EventStreamTopic, event: &EventEntity, ) -> Result { - let is_processed = ctx - .app_stores - .deduplication - .get_one_by_id(&event.entity_id.to_string()) - .await - .map_err(|e| { - tracing::error!("Could not fetch deduplication record: {e}"); - InternalError::unknown("Could not fetch deduplication record", None) - })? - .is_some(); - - if is_processed { - tracing::info!("Event with id {} is already processed", event.entity_id); - return Ok(()); - } - - let insert_result = ctx - .app_stores - .deduplication - .create_one(&Deduplication { - entity_id: event.entity_id, - metadata: event.metadata.clone(), - }) - .await; - - if let Err(e) = insert_result { - tracing::error!("Could not create deduplication record: {e}"); - if e.is_unique_error() { + let task = { + let is_processed = ctx + .app_stores + .deduplication + .get_one_by_id(&event.entity_id.to_string()) + .await + .map_err(|e| { + tracing::error!("Could not fetch deduplication record: {e}"); + InternalError::unknown("Could not fetch deduplication record", None) + })? + .is_some(); + + if is_processed { + tracing::info!("Event with id {} is already processed", event.entity_id); return Ok(()); - } else { - return Err(e); } - } - - match target { - EventStreamTopic::Target => { - ctx.app_stores.events.create_one(event).await.map_err(|e| { - tracing::error!("Could not create event record: {e}"); - InternalError::unknown("Could not create event record", None) - })?; - - tracing::info!("Event with id {} is ready to be processed", event.entity_id); - let result = event - .side_effect(ctx) - .timed(|_, elapsed| { - tracing::info!( - "Side effect for entity id {} took {}ms", - event.entity_id, - elapsed.as_millis() - ) - }) - .await; - - update_event_outcome(ctx, event, EventStatus::executed()).await?; - if let Err(e) = result { - tracing::error!("Error processing event: {e}, removing deduplication record"); - delete_deduplication_record(ctx, event).await?; - - let outcome = EventStatus::errored(e.to_string(), 1); - let event = event.with_outcome(outcome.clone()); + let insert_result = ctx + .app_stores + .deduplication + .create_one(&Deduplication { + entity_id: event.entity_id, + metadata: event.metadata.clone(), + }) + .await; + + if let Err(e) = insert_result { + tracing::error!("Could not create deduplication record: {e}"); + if e.is_unique_error() { + return Ok(()); + } else { + return Err(e); + } + } - self.publish(event.clone(), EventStreamTopic::Dlq).await?; + match target { + EventStreamTopic::Target => { + ctx.app_stores.events.create_one(event).await.map_err(|e| { + tracing::error!("Could not create event record: {e}"); + InternalError::unknown("Could not create event record", None) + })?; - update_event_outcome(ctx, &event, outcome).await?; + tracing::info!("Event with id {} is ready to be processed", event.entity_id); + let result = event + .side_effect(ctx) + .timed(|_, elapsed| { + self.metrics.duration(elapsed); - return Ok(()); - } + tracing::info!( + "Side effect for entity id {} took {}ms", + event.entity_id, + elapsed.as_millis() + ) + }) + .await; - update_event_outcome(ctx, event, EventStatus::succeded(event.retries())).await?; - } - EventStreamTopic::Dlq => { - tracing::info!("Event with id {} is in DLQ", event.entity_id); - if event.retries() <= ctx.config.event_processing_max_retries { - let result = event.side_effect(ctx).await; + update_event_outcome(ctx, event, EventStatus::executed()).await?; if let Err(e) = result { + self.metrics.errored(1); tracing::error!( "Error processing event: {e}, removing deduplication record" ); delete_deduplication_record(ctx, event).await?; - let outcome = EventStatus::errored(e.to_string(), event.retries() + 1); + let outcome = EventStatus::errored(e.to_string(), 1); let event = event.with_outcome(outcome.clone()); self.publish(event.clone(), EventStreamTopic::Dlq).await?; @@ -437,22 +452,62 @@ impl EventStreamExt for FluvioDriverImpl { update_event_outcome(ctx, event, EventStatus::succeded(event.retries())) .await?; - } else { - tracing::info!("Giving up on event with id {}", event.entity_id); - // this is the case where we exhausted the retries, now - // the error is updated and not sent to the target topic - let error = event.error().unwrap_or_default() - + ".\n Exhausted retries, cannot process event"; + } + EventStreamTopic::Dlq => { + tracing::info!("Event with id {} is in DLQ", event.entity_id); + if event.retries() <= ctx.config.event_processing_max_retries { + let result = event.side_effect(ctx).await; + + if let Err(e) = result { + tracing::error!( + "Error processing event: {e}, removing deduplication record" + ); + delete_deduplication_record(ctx, event).await?; + + let outcome = EventStatus::errored(e.to_string(), event.retries() + 1); + let event = event.with_outcome(outcome.clone()); + + self.publish(event.clone(), EventStreamTopic::Dlq).await?; + + update_event_outcome(ctx, &event, outcome).await?; - update_event_outcome(ctx, event, EventStatus::errored(error, event.retries())) + return Ok(()); + } + + update_event_outcome(ctx, event, EventStatus::succeded(event.retries())) + .await?; + } else { + tracing::info!("Giving up on event with id {}", event.entity_id); + // this is the case where we exhausted the retries, now + // the error is updated and not sent to the target topic + let error = event.error().unwrap_or_default() + + ".\n Exhausted retries, cannot process event"; + + update_event_outcome( + ctx, + event, + EventStatus::errored(error, event.retries()), + ) .await?; - // TODO: create an alert on grafana + // TODO: create an alert on grafana + } } } - } - Ok(()) + Ok(()) + }; + + match task { + Ok(_) => { + self.metrics.succeeded(1); + Ok(()) + } + Err(e) => { + self.metrics.errored(1); + Err(e) + } + } } } diff --git a/integrationos-emit/tests/context.rs b/integrationos-emit/tests/context.rs index 6994464b..3296ae2b 100644 --- a/integrationos-emit/tests/context.rs +++ b/integrationos-emit/tests/context.rs @@ -1,6 +1,7 @@ use envconfig::Envconfig; use http::{Method, StatusCode}; use integrationos_domain::{IntegrationOSError, InternalError, Unit}; +use integrationos_emit::algebra::metrics::{MetricHandle, MetricsRegistry}; use integrationos_emit::domain::config::EmitterConfig; use integrationos_emit::server::Server; use mockito::{Server as MockServer, ServerGuard}; @@ -8,6 +9,7 @@ use serde::{de::DeserializeOwned, Serialize}; use serde_json::Value; use std::error::Error; use std::fmt::Debug; +use std::sync::Arc; use std::{collections::HashMap, sync::OnceLock, time::Duration}; use testcontainers_modules::{ mongo::Mongo, @@ -22,6 +24,7 @@ use uuid::Uuid; static DOCKER: OnceLock = OnceLock::new(); static MONGO: OnceLock> = OnceLock::new(); static TRACING: OnceLock = OnceLock::new(); +static METRICS: OnceLock> = OnceLock::new(); pub struct TestServer { pub port: u16, @@ -36,7 +39,7 @@ pub struct ApiResponse { } impl TestServer { - pub async fn new(stream: bool) -> Result { + pub async fn new() -> Result { TRACING.get_or_init(|| { let filter = EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) @@ -44,6 +47,7 @@ impl TestServer { tracing_subscriber::fmt().with_env_filter(filter).init(); }); + let metrics = METRICS.get_or_init(|| Arc::new(MetricsRegistry::handle())); let docker = DOCKER.get_or_init(Default::default); let mongo = MONGO.get_or_init(|| docker.run(Mongo)); let port = mongo.get_host_port_ipv4(27017); @@ -51,17 +55,30 @@ impl TestServer { let database_uri = format!("mongodb://127.0.0.1:{port}/?directConnection=true"); let database_name = Uuid::new_v4().to_string(); - let port = TcpListener::bind("127.0.0.1:0") + let server_port = TcpListener::bind("127.0.0.1:0") .await .expect("Failed to bind to port") .local_addr() .expect("Failed to get local address") .port(); - let mut config = vec![ + let metrics_port = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind to port") + .local_addr() + .expect("Failed to get local address") + .port(); + + let mock_server = MockServer::new_async().await; + let mock_uri = mock_server.url(); + let config = vec![ ( "INTERNAL_SERVER_ADDRESS".to_string(), - format!("0.0.0.0:{port}"), + format!("0.0.0.0:{server_port}"), + ), + ( + "METRICS_SERVER_ADDRESS".to_string(), + format!("0.0.0.0:{metrics_port}"), ), ("CONTROL_DATABASE_URL".to_string(), database_uri.clone()), ("CONTROL_DATABASE_NAME".to_string(), database_name.clone()), @@ -69,37 +86,36 @@ impl TestServer { ("CONTEXT_DATABASE_NAME".to_string(), database_name.clone()), ("EVENT_DATABASE_URL".to_string(), database_uri.clone()), ("EVENT_DATABASE_NAME".to_string(), database_name.clone()), - ]; - - let mock_server = MockServer::new_async().await; - - if stream { - let uri = mock_server.url(); - - config.push(("EVENT_STREAM_PROVIDER".to_string(), "fluvio".to_string())); - config.push(("EVENT_STREAM_PORT".to_string(), "9103".to_string())); - config.push(( + ( + "STATEFULSET_POD_NAME".to_string(), + "event-emit-0".to_string(), + ), + ("PARTITION_COUNT".to_string(), "1".to_string()), + ("ENVIRONMENT".to_string(), "test".to_string()), + ("EVENT_STREAM_PROVIDER".to_string(), "fluvio".to_string()), + ("EVENT_STREAM_PORT".to_string(), "9103".to_string()), + ( "EVENT_STREAM_PRODUCER_TOPIC".to_string(), "events".to_string(), - )); - config.push(( + ), + ( "EVENT_STREAM_CONSUMER_TOPIC".to_string(), "events".to_string(), - )); - config.push(( + ), + ( "EVENT_STREAM_CONSUMER_GROUP".to_string(), "event-all-partitions-consumer".to_string(), - )); - config.push(( + ), + ( "EVENT_CALLBACK_URL".to_string(), - format!("{uri}/v1/event-callbacks"), - )); - } + format!("{mock_uri}/v1/event-callbacks"), + ), + ]; let config = EmitterConfig::init_from_hashmap(&HashMap::from_iter(config)) .expect("Failed to initialize storage config"); - let server = Server::init(config.clone()) + let server = Server::init(config.clone(), metrics) .await .expect("Failed to initialize storage"); @@ -117,7 +133,7 @@ impl TestServer { let client = reqwest::Client::new(); Ok(Self { - port, + port: server_port, client, mock_server, }) diff --git a/integrationos-emit/tests/http/emitter.rs b/integrationos-emit/tests/http/emitter.rs index 6eae74ca..2cb15143 100644 --- a/integrationos-emit/tests/http/emitter.rs +++ b/integrationos-emit/tests/http/emitter.rs @@ -14,7 +14,7 @@ const PARALLEL_REQUESTS: usize = 10; #[tokio::test] async fn test_concurrent_requests() -> Result { - let server = TestServer::new(true).await?; + let server = TestServer::new().await?; let payload = json!({ "type": "DatabaseConnectionLost", "connectionId": "conn::GAL2svWJp9k::MtmXaau5Qf6R5n3Y-L9ejQ" @@ -76,7 +76,7 @@ async fn test_concurrent_requests() -> Result { #[tokio::test] async fn test_event_processed() -> Result { - let mut server = TestServer::new(true).await?; + let mut server = TestServer::new().await?; let id = Id::now(IdPrefix::Connection).to_string(); let payload = json!({ diff --git a/integrationos-event/tests/mock_destination.rs b/integrationos-event/tests/mock_destination.rs index 65b410ee..2a9af9ec 100644 --- a/integrationos-event/tests/mock_destination.rs +++ b/integrationos-event/tests/mock_destination.rs @@ -100,6 +100,8 @@ pub async fn seed_db(config: &EventCoreConfig, base_url: String) -> Id { key: "key".into(), name: None, group: "group".to_string(), + has_error: false, + error: None, platform: "platform".to_string().into(), environment: Environment::Live, secrets_service_id: "secrets_service_id".to_string(),