From 63d2f1522cca2dc7bf1bec5683e7f43ebba4c92f Mon Sep 17 00:00:00 2001 From: Hengfei Yang Date: Fri, 1 Dec 2023 23:27:43 +0800 Subject: [PATCH] feat: add more variables for alerts (#1973) --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/common/infra/config.rs | 17 +- src/common/infra/db/dynamo.rs | 8 +- src/common/infra/db/etcd.rs | 8 +- src/common/infra/db/sled.rs | 8 +- src/common/infra/metrics.rs | 8 +- src/common/infra/storage/local.rs | 7 +- src/common/infra/storage/remote.rs | 7 +- src/common/infra/wal.rs | 2 +- src/common/meta/alert.rs | 340 ------- src/common/meta/alerts/destinations.rs | 80 ++ src/common/meta/alerts/mod.rs | 179 ++++ src/common/meta/alerts/templates.rs | 30 + src/common/meta/alerts/triggers.rs | 23 + src/common/meta/http.rs | 31 +- src/common/meta/mod.rs | 2 +- src/common/meta/organization.rs | 2 +- src/common/meta/telemetry.rs | 8 +- src/common/migration/meta.rs | 50 +- src/common/utils/mod.rs | 1 - src/common/utils/notification.rs | 180 ---- src/handler/grpc/request/metrics/querier.rs | 2 +- .../http/request/alerts/destinations.rs | 87 +- src/handler/http/request/alerts/mod.rs | 173 ++-- src/handler/http/request/alerts/templates.rs | 75 +- src/handler/http/request/functions/mod.rs | 24 +- src/handler/http/request/kv/mod.rs | 14 +- src/handler/http/request/logs/ingest.rs | 20 +- src/handler/http/request/metrics/ingest.rs | 8 +- src/handler/http/request/organization/mod.rs | 14 +- .../http/request/organization/settings.rs | 8 +- src/handler/http/request/prom/mod.rs | 32 +- src/handler/http/request/rum/ingest.rs | 12 +- src/handler/http/request/search/mod.rs | 16 +- src/handler/http/request/search/saved_view.rs | 30 +- src/handler/http/request/stream/mod.rs | 20 +- src/handler/http/request/traces/mod.rs | 10 +- src/handler/http/request/users/mod.rs | 14 +- src/handler/http/router/mod.rs | 152 ++-- src/handler/http/router/openapi.rs | 96 +- src/job/alert_manager.rs | 2 +- src/job/compact.rs | 3 +- src/job/metrics.rs | 12 + src/job/mmdb_downloader.rs | 90 +- src/job/mod.rs | 5 +- src/service/alert_manager.rs | 159 ---- src/service/alerts/alert_manager.rs | 126 +++ src/service/alerts/destinations.rs | 113 ++- src/service/alerts/mod.rs | 845 +++++++++++++----- src/service/alerts/templates.rs | 95 +- src/service/alerts/triggers.rs | 36 + src/service/db/alerts/destinations.rs | 50 +- src/service/db/alerts/mod.rs | 166 ++-- src/service/db/alerts/templates.rs | 42 +- src/service/db/{ => alerts}/triggers.rs | 62 +- src/service/db/metrics.rs | 14 - src/service/db/mod.rs | 1 - src/service/enrichment_table/geoip.rs | 2 +- src/service/file_list.rs | 2 +- src/service/functions.rs | 15 +- src/service/ingestion/grpc.rs | 4 +- src/service/ingestion/mod.rs | 64 +- src/service/logs/bulk.rs | 31 +- src/service/logs/gcs_pub_sub.rs | 39 +- src/service/logs/ingest.rs | 59 +- src/service/logs/json.rs | 39 +- src/service/logs/kinesis_firehose.rs | 26 +- src/service/logs/mod.rs | 80 +- src/service/logs/multi.rs | 31 +- src/service/logs/otlp_grpc.rs | 45 +- src/service/logs/otlp_http.rs | 63 +- src/service/logs/syslog.rs | 26 +- src/service/metrics/otlp_grpc.rs | 97 +- src/service/metrics/otlp_http.rs | 119 +-- src/service/metrics/prom.rs | 114 +-- src/service/mod.rs | 2 - src/service/organization.rs | 3 +- src/service/promql/aggregations/bottomk.rs | 3 +- .../promql/aggregations/count_values.rs | 13 +- src/service/promql/aggregations/mod.rs | 6 +- src/service/promql/aggregations/quantile.rs | 10 +- src/service/promql/aggregations/topk.rs | 3 +- src/service/promql/common.rs | 15 + src/service/search/grpc/mod.rs | 24 +- src/service/search/grpc/storage.rs | 14 +- src/service/search/grpc/wal.rs | 14 +- src/service/traces/mod.rs | 65 +- src/service/traces/otlp_http.rs | 69 +- src/service/triggers.rs | 94 -- src/service/usage/stats.rs | 3 +- src/service/users.rs | 2 +- web/src/components/alerts/AddAlert.vue | 156 ++-- web/src/components/alerts/AddAlert2.vue | 804 ----------------- web/src/components/alerts/AddTemplate.vue | 10 +- web/src/components/alerts/AlertList.vue | 149 +-- web/src/components/alerts/FieldsInput.vue | 3 +- web/src/components/alerts/ScheduledAlert.vue | 34 +- web/src/components/alerts/alerts.ts | 60 ++ web/src/components/functions/add.vue | 2 +- web/src/locales/languages/de.json | 52 +- web/src/locales/languages/en.json | 68 +- web/src/locales/languages/es.json | 52 +- web/src/locales/languages/fr.json | 52 +- web/src/locales/languages/hi.json | 52 +- web/src/locales/languages/it.json | 52 +- web/src/locales/languages/ja.json | 52 +- web/src/locales/languages/ko.json | 52 +- web/src/locales/languages/nl.json | 52 +- web/src/locales/languages/pt.json | 52 +- web/src/locales/languages/tr.json | 52 +- web/src/locales/languages/zh.json | 66 +- web/src/services/alerts.ts | 9 + .../unit/components/alerts/AlertList.spec.ts | 3 +- web/src/test/unit/mockData/alerts.ts | 6 +- web/src/ts/interfaces/alert.ts | 42 +- 116 files changed, 3151 insertions(+), 3600 deletions(-) delete mode 100644 src/common/meta/alert.rs create mode 100644 src/common/meta/alerts/destinations.rs create mode 100644 src/common/meta/alerts/mod.rs create mode 100644 src/common/meta/alerts/templates.rs create mode 100644 src/common/meta/alerts/triggers.rs delete mode 100644 src/common/utils/notification.rs delete mode 100644 src/service/alert_manager.rs create mode 100644 src/service/alerts/alert_manager.rs create mode 100644 src/service/alerts/triggers.rs rename src/service/db/{ => alerts}/triggers.rs (62%) delete mode 100644 src/service/triggers.rs delete mode 100644 web/src/components/alerts/AddAlert2.vue create mode 100644 web/src/components/alerts/alerts.ts diff --git a/Cargo.lock b/Cargo.lock index d71abd200a7..79a1979d7e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4404,7 +4404,7 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openobserve" -version = "0.7.1" +version = "0.8.0" dependencies = [ "actix-cors", "actix-multipart", diff --git a/Cargo.toml b/Cargo.toml index 5cd1171e27f..d31dcbbcddc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ keywords = [ license = "AGPL-3.0" name = "openobserve" repository = "https://github.com/openobserve/openobserve/" -version = "0.7.1" +version = "0.8.0" publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/common/infra/config.rs b/src/common/infra/config.rs index e89066c84d2..b3f2494aa4a 100644 --- a/src/common/infra/config.rs +++ b/src/common/infra/config.rs @@ -29,7 +29,7 @@ use vector_enrichment::TableRegistry; use crate::common::{ meta::{ - alert::{AlertDestination, AlertList, DestinationTemplate, Trigger, TriggerTimer}, + alerts, functions::{StreamFunctionsList, Transform}, maxmind::MaxmindClient, organization::OrganizationSetting, @@ -158,12 +158,13 @@ pub static METRIC_CLUSTER_MAP: Lazy>>> = Lazy::new(|| Arc::new(tokio::sync::RwLock::new(AHashMap::new()))); pub static METRIC_CLUSTER_LEADER: Lazy>> = Lazy::new(|| Arc::new(tokio::sync::RwLock::new(AHashMap::new()))); -pub static STREAM_ALERTS: Lazy> = Lazy::new(DashMap::default); -pub static TRIGGERS: Lazy> = Lazy::new(DashMap::default); -pub static TRIGGERS_IN_PROCESS: Lazy> = Lazy::new(DashMap::default); -pub static ALERTS_TEMPLATES: Lazy> = +pub static STREAM_ALERTS: Lazy>> = Lazy::new(Default::default); -pub static ALERTS_DESTINATIONS: Lazy> = +pub static TRIGGERS: Lazy> = + Lazy::new(Default::default); +pub static ALERTS_TEMPLATES: Lazy> = + Lazy::new(Default::default); +pub static ALERTS_DESTINATIONS: Lazy> = Lazy::new(Default::default); pub static SYSLOG_ROUTES: Lazy> = Lazy::new(Default::default); pub static SYSLOG_ENABLED: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(false))); @@ -382,7 +383,7 @@ pub struct Common { #[env_config(name = "ZO_DEFAULT_SCRAPE_INTERVAL", default = 15)] // Default scrape_interval value 15s pub default_scrape_interval: u32, - // logger timestamp local setup + // logger timestamp local setup, eg: %Y-%m-%dT%H:%M:%S #[env_config(name = "ZO_LOG_LOCAL_TIME_FORMAT", default = "")] pub log_local_time_format: String, #[env_config(name = "ZO_CIRCUIT_BREAKER_ENABLE", default = false)] @@ -416,6 +417,8 @@ pub struct Limit { pub query_timeout: u64, #[env_config(name = "ZO_INGEST_ALLOWED_UPTO", default = 5)] // in hours - in past pub ingest_allowed_upto: i64, + #[env_config(name = "ZO_IGNORE_FILE_RETENTION_BY_STREAM", default = false)] + pub ignore_file_retention_by_stream: bool, #[env_config(name = "ZO_LOGS_FILE_RETENTION", default = "hourly")] pub logs_file_retention: String, #[env_config(name = "ZO_TRACES_FILE_RETENTION", default = "hourly")] diff --git a/src/common/infra/db/dynamo.rs b/src/common/infra/db/dynamo.rs index 31aed925bd5..8f14114cda1 100644 --- a/src/common/infra/db/dynamo.rs +++ b/src/common/infra/db/dynamo.rs @@ -28,9 +28,11 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::{mpsc, OnceCell}; -use super::Event; -use super::Stats; -use crate::common::infra::{config::CONFIG, errors::*}; +use crate::common::infra::{ + config::CONFIG, + db::{Event, Stats}, + errors::*, +}; static DB: OnceCell = OnceCell::const_new(); static DB_CLIENT: OnceCell = OnceCell::const_new(); diff --git a/src/common/infra/db/etcd.rs b/src/common/infra/db/etcd.rs index c3b3deae05a..ab35e3e4a39 100644 --- a/src/common/infra/db/etcd.rs +++ b/src/common/infra/db/etcd.rs @@ -32,8 +32,12 @@ use tokio::{ time, }; -use super::{Event, EventData}; -use crate::common::infra::{cluster, config::CONFIG, errors::*}; +use crate::common::infra::{ + cluster, + config::CONFIG, + db::{Event, EventData}, + errors::*, +}; static ETCD_CLIENT: OnceCell = OnceCell::const_new(); diff --git a/src/common/infra/db/sled.rs b/src/common/infra/db/sled.rs index 5d27e62c706..718c59e0538 100644 --- a/src/common/infra/db/sled.rs +++ b/src/common/infra/db/sled.rs @@ -20,8 +20,12 @@ use once_cell::sync::Lazy; use std::sync::Arc; use tokio::{sync::mpsc, task::JoinHandle}; -use super::{Event, EventData}; -use crate::common::infra::{cluster, config::CONFIG, errors::*}; +use crate::common::infra::{ + cluster, + config::CONFIG, + db::{Event, EventData}, + errors::*, +}; pub static SLED_CLIENT: Lazy> = Lazy::new(connect_sled); diff --git a/src/common/infra/metrics.rs b/src/common/infra/metrics.rs index 5d2d74af7d9..319d093a5c0 100644 --- a/src/common/infra/metrics.rs +++ b/src/common/infra/metrics.rs @@ -493,12 +493,12 @@ pub static META_NUM_DASHBOARDS: Lazy = Lazy::new(|| { .expect("Metric created") }); -pub static OO_MEM_USAGE: Lazy = Lazy::new(|| { +pub static MEMORY_USAGE: Lazy = Lazy::new(|| { IntGaugeVec::new( - Opts::new("mem_usage", "Openobserve memory usage") + Opts::new("memory_usage", "Process memory usage") .namespace(NAMESPACE) .const_labels(create_const_labels()), - &["organization"], + &[], ) .expect("Metric created") }); @@ -631,7 +631,7 @@ fn register_metrics(registry: &Registry) { .register(Box::new(META_NUM_DASHBOARDS.clone())) .expect("Metric registered"); registry - .register(Box::new(OO_MEM_USAGE.clone())) + .register(Box::new(MEMORY_USAGE.clone())) .expect("Metric registered"); } diff --git a/src/common/infra/storage/local.rs b/src/common/infra/storage/local.rs index fb4b624883b..b75c6726eee 100644 --- a/src/common/infra/storage/local.rs +++ b/src/common/infra/storage/local.rs @@ -23,8 +23,11 @@ use object_store::{ use std::ops::Range; use tokio::io::AsyncWrite; -use super::{format_key, CONCURRENT_REQUESTS}; -use crate::common::infra::{config::CONFIG, metrics}; +use crate::common::infra::{ + config::CONFIG, + metrics, + storage::{format_key, CONCURRENT_REQUESTS}, +}; pub struct Local { client: LimitStore>, diff --git a/src/common/infra/storage/remote.rs b/src/common/infra/storage/remote.rs index 44707609c7a..7c76cb02f3d 100644 --- a/src/common/infra/storage/remote.rs +++ b/src/common/infra/storage/remote.rs @@ -23,8 +23,11 @@ use object_store::{ use std::ops::Range; use tokio::io::AsyncWrite; -use super::{format_key, CONCURRENT_REQUESTS}; -use crate::common::infra::{config::CONFIG, metrics}; +use crate::common::infra::{ + config::CONFIG, + metrics, + storage::{format_key, CONCURRENT_REQUESTS}, +}; pub struct Remote { client: LimitStore>, diff --git a/src/common/infra/wal.rs b/src/common/infra/wal.rs index 36e8ed12cb4..886343d92a7 100644 --- a/src/common/infra/wal.rs +++ b/src/common/infra/wal.rs @@ -333,7 +333,7 @@ impl RwFile { let time_now: DateTime = Utc::now(); let level_duration = partition_time_level.unwrap_or_default().duration(); - let ttl = if level_duration > 0 { + let ttl = if !CONFIG.limit.ignore_file_retention_by_stream && level_duration > 0 { let time_end_day = Utc .with_ymd_and_hms( time_now.year(), diff --git a/src/common/meta/alert.rs b/src/common/meta/alert.rs deleted file mode 100644 index 983967f0a1a..00000000000 --- a/src/common/meta/alert.rs +++ /dev/null @@ -1,340 +0,0 @@ -// Copyright 2023 Zinc Labs Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use ahash::HashMap; -use serde::{Deserialize, Serialize}; -use std::fmt; -use utoipa::ToSchema; - -use super::{search::Query, StreamType}; -use crate::common::utils::json::{Map, Value}; - -#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] -pub struct Alert { - #[serde(default)] - pub name: String, - #[serde(default)] - pub stream: String, - #[schema(value_type = Option)] - pub query: Option, - pub condition: Condition, - pub duration: i64, - pub frequency: i64, - pub time_between_alerts: i64, - pub destination: String, - #[serde(default)] - pub is_real_time: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub context_attributes: Option>, - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub stream_type: Option, -} - -#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] -pub struct AlertDestination { - pub name: Option, - pub url: String, - pub method: AlertHTTPType, - #[serde(default)] - pub skip_tls_verify: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub headers: Option>, - pub template: String, -} - -impl AlertDestination { - pub fn to_dest_resp(&self, template: Option) -> AlertDestinationResponse { - AlertDestinationResponse { - url: self.url.clone(), - method: self.method.clone(), - skip_tls_verify: self.skip_tls_verify, - headers: self.headers.clone(), - template, - name: self.name.clone().unwrap(), - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] -pub struct AlertDestinationResponse { - pub name: String, - pub url: String, - pub method: AlertHTTPType, - #[serde(default)] - pub skip_tls_verify: bool, - #[serde(skip_serializing_if = "Option::is_none")] - pub headers: Option>, - pub template: Option, -} - -#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)] -pub enum AlertHTTPType { - #[default] - #[serde(rename = "post")] - POST, - #[serde(rename = "put")] - PUT, - #[serde(rename = "get")] - GET, -} - -impl fmt::Display for AlertHTTPType { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - AlertHTTPType::POST => write!(f, "post"), - AlertHTTPType::PUT => write!(f, "put"), - AlertHTTPType::GET => write!(f, "get"), - } - } -} - -#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)] -pub struct DestinationTemplate { - pub name: Option, - #[schema(value_type = Object)] - pub body: Value, - #[serde(rename = "isDefault")] - #[serde(default)] - pub is_default: Option, -} - -#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)] -pub enum AlertDestType { - #[default] - #[serde(rename = "slack")] - Slack, - #[serde(rename = "alertmanager")] - AlertManager, -} - -impl fmt::Display for AlertDestType { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - AlertDestType::Slack => write!(f, "slack"), - AlertDestType::AlertManager => write!(f, "alertmanager"), - } - } -} - -impl PartialEq for Alert { - fn eq(&self, other: &Self) -> bool { - self.name == other.name && self.stream == other.stream - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] -pub struct AlertList { - pub list: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Trigger { - #[serde(default)] - pub timestamp: i64, - #[serde(default)] - pub is_valid: bool, - #[serde(default)] - pub alert_name: String, - #[serde(default)] - pub stream: String, - #[serde(default)] - pub org: String, - #[serde(default)] - pub last_sent_at: i64, - #[serde(default)] - pub count: i64, - #[serde(default)] - pub is_ingest_time: bool, - #[serde(default)] - pub stream_type: StreamType, - #[serde(default)] - pub parent_alert_deleted: bool, -} - -impl Default for Trigger { - fn default() -> Self { - Trigger { - timestamp: 0, - is_valid: true, - alert_name: String::new(), - stream: String::new(), - org: String::new(), - last_sent_at: 0, - count: 0, - stream_type: StreamType::Logs, - is_ingest_time: false, - parent_alert_deleted: false, - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct TriggerTimer { - #[serde(default)] - pub updated_at: i64, - #[serde(default)] - pub expires_at: i64, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct Condition { - pub column: String, - pub operator: AllOperator, - #[serde(default)] - pub ignore_case: Option, - #[schema(value_type = Object)] - pub value: Value, - pub is_numeric: Option, -} - -impl Evaluate for Condition { - fn evaluate(&self, row: Map) -> bool { - if !row.contains_key(&self.column) { - return false; - }; - - let evaluate_numeric = if self.is_numeric.is_some() { - self.is_numeric.unwrap() - } else { - matches!(row.get(&self.column).unwrap(), Value::Number(_)) - }; - - /* let evaluate_numeric = match row.get(&self.column).expect("column exists") { - Value::Number(number) => number.as_f64().unwrap() > 0.0, - Value::String(s) => s.is_empty(), - _ => false, - }; */ - - if evaluate_numeric { - let number = match row.get(&self.column).expect("column exists") { - Value::Number(number) => number, - _ => unreachable!("please make sure right value for is_numeric is set trigger"), - }; - - match self.operator { - AllOperator::EqualTo => number.as_f64().unwrap() == get_numeric_val(&self.value), - AllOperator::NotEqualTo => number.as_f64().unwrap() != get_numeric_val(&self.value), - AllOperator::GreaterThan => number.as_f64().unwrap() > get_numeric_val(&self.value), - AllOperator::GreaterThanEquals => { - number.as_f64().unwrap() >= get_numeric_val(&self.value) - } - AllOperator::LessThan => number.as_f64().unwrap() < get_numeric_val(&self.value), - AllOperator::LessThanEquals => { - number.as_f64().unwrap() <= get_numeric_val(&self.value) - } - _ => false, - } - } else { - let string = match row.get(&self.column).expect("column exists") { - Value::String(s) => s, - _ => unreachable!("please make sure right value for is_numeric is set trigger"), - }; - - if self.ignore_case.unwrap_or_default() { - match self.operator { - AllOperator::EqualTo => { - string.eq_ignore_ascii_case(self.value.as_str().unwrap()) - } - AllOperator::NotEqualTo => { - !string.eq_ignore_ascii_case(self.value.as_str().unwrap()) - } - AllOperator::Contains => string - .to_ascii_lowercase() - .contains(&self.value.as_str().unwrap().to_ascii_lowercase()), - AllOperator::NotContains => !string - .to_ascii_lowercase() - .contains(&self.value.as_str().unwrap().to_ascii_lowercase()), - _ => false, - } - } else { - match self.operator { - AllOperator::EqualTo => string.eq(self.value.as_str().unwrap()), - AllOperator::NotEqualTo => !string.eq(self.value.as_str().unwrap()), - AllOperator::Contains => string.contains(self.value.as_str().unwrap()), - AllOperator::NotContains => !string.contains(self.value.as_str().unwrap()), - _ => false, - } - } - } - } -} - -fn get_numeric_val(value: &Value) -> f64 { - if value.is_boolean() { - f64::INFINITY - } else if value.is_f64() { - value.as_f64().unwrap() - } else if value.is_i64() { - value.as_i64().unwrap() as f64 - } else if value.is_u64() { - value.as_u64().unwrap() as f64 - } else if value.is_string() { - match value.as_str().unwrap().to_string().parse::() { - Ok(val) => val, - Err(_) => f64::INFINITY, - } - } else { - f64::INFINITY - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] -pub enum AllOperator { - #[serde(alias = "=")] - EqualTo, - #[serde(alias = "!=")] - NotEqualTo, - #[serde(alias = ">")] - GreaterThan, - #[serde(alias = ">=")] - GreaterThanEquals, - #[serde(alias = "<")] - LessThan, - #[serde(alias = "<=")] - LessThanEquals, - Contains, - NotContains, -} - -impl Default for AllOperator { - fn default() -> Self { - Self::EqualTo - } -} - -pub trait Evaluate { - fn evaluate(&self, row: Map) -> bool; -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::common::utils::json::json; - - #[test] - fn test_evaluate() { - let condition = Condition { - column: "occurrence".to_owned(), - operator: AllOperator::GreaterThanEquals, - ignore_case: None, - value: json!("5"), - is_numeric: None, - }; - let row = json!({"Country":"USA","occurrence": 10}); - condition.evaluate(row.as_object().unwrap().clone()); - } -} diff --git a/src/common/meta/alerts/destinations.rs b/src/common/meta/alerts/destinations.rs new file mode 100644 index 00000000000..5580730fe59 --- /dev/null +++ b/src/common/meta/alerts/destinations.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use ahash::HashMap; +use serde::{Deserialize, Serialize}; +use std::fmt; +use utoipa::ToSchema; + +use super::templates::Template; + +#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] +pub struct Destination { + #[serde(default)] + pub name: String, + pub url: String, + pub method: HTTPType, + #[serde(default)] + pub skip_tls_verify: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub headers: Option>, + pub template: String, +} + +impl Destination { + pub fn with_template(&self, template: Template) -> DestinationWithTemplate { + DestinationWithTemplate { + name: self.name.clone(), + url: self.url.clone(), + method: self.method.clone(), + skip_tls_verify: self.skip_tls_verify, + headers: self.headers.clone(), + template, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] +pub struct DestinationWithTemplate { + pub name: String, + pub url: String, + pub method: HTTPType, + #[serde(default)] + pub skip_tls_verify: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub headers: Option>, + pub template: Template, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)] +pub enum HTTPType { + #[default] + #[serde(rename = "post")] + POST, + #[serde(rename = "put")] + PUT, + #[serde(rename = "get")] + GET, +} + +impl fmt::Display for HTTPType { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + HTTPType::POST => write!(f, "post"), + HTTPType::PUT => write!(f, "put"), + HTTPType::GET => write!(f, "get"), + } + } +} diff --git a/src/common/meta/alerts/mod.rs b/src/common/meta/alerts/mod.rs new file mode 100644 index 00000000000..d9ea7645100 --- /dev/null +++ b/src/common/meta/alerts/mod.rs @@ -0,0 +1,179 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use ahash::HashMap; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::common::utils::json::Value; + +use super::StreamType; + +pub mod destinations; +pub mod templates; +pub mod triggers; + +#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] +pub struct Alert { + #[serde(default)] + pub name: String, + #[serde(default)] + pub org_id: String, + #[serde(default)] + pub stream_type: StreamType, + #[serde(default)] + pub stream_name: String, + #[serde(default)] + pub is_real_time: bool, + #[serde(default)] + pub query_condition: QueryCondition, + #[serde(default)] + pub trigger_condition: TriggerCondition, + pub destinations: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub context_attributes: Option>, + #[serde(default)] + pub description: String, + #[serde(default)] + pub enabled: bool, +} + +impl PartialEq for Alert { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.stream_type == other.stream_type + && self.stream_name == other.stream_name + } +} + +impl Default for Alert { + fn default() -> Self { + Self { + name: "".to_string(), + org_id: "".to_string(), + stream_type: StreamType::default(), + stream_name: "".to_string(), + is_real_time: false, + query_condition: QueryCondition::default(), + trigger_condition: TriggerCondition::default(), + destinations: vec![], + context_attributes: None, + description: "".to_string(), + enabled: false, + } + } +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)] +pub struct TriggerCondition { + pub period: i64, // 10 minutes + pub operator: Operator, // >= + pub threshold: i64, // 3 times + #[serde(default)] + pub frequency: i64, // 1 minute + #[serde(default)] + pub silence: i64, // silence for 10 minutes after fire an alert +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)] +pub struct QueryCondition { + pub conditions: Option>, + pub sql: Option, + pub promql: Option, + #[serde(default)] + #[serde(rename = "type")] + pub query_type: QueryType, +} + +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize, ToSchema)] +pub enum QueryType { + #[default] + #[serde(rename = "custom")] + Custom, + #[serde(rename = "sql")] + SQL, + #[serde(rename = "promql")] + PromQL, +} + +impl ToString for QueryType { + fn to_string(&self) -> String { + match self { + QueryType::Custom => "custom".to_string(), + QueryType::SQL => "sql".to_string(), + QueryType::PromQL => "promql".to_string(), + } + } +} + +impl From<&str> for QueryType { + fn from(s: &str) -> Self { + match s.to_lowercase().as_str() { + "custom" => QueryType::Custom, + "sql" => QueryType::SQL, + "promql" => QueryType::PromQL, + _ => QueryType::Custom, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +pub struct Condition { + pub column: String, + pub operator: Operator, + #[schema(value_type = Object)] + pub value: Value, + #[serde(default)] + pub ignore_case: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +pub enum Operator { + #[serde(rename = "=")] + EqualTo, + #[serde(rename = "!=")] + NotEqualTo, + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = ">=")] + GreaterThanEquals, + #[serde(rename = "<")] + LessThan, + #[serde(rename = "<=")] + LessThanEquals, + Contains, + NotContains, +} + +impl Default for Operator { + fn default() -> Self { + Self::EqualTo + } +} + +impl ToString for Operator { + fn to_string(&self) -> String { + match self { + Operator::EqualTo => "=".to_string(), + Operator::NotEqualTo => "!=".to_string(), + Operator::GreaterThan => ">".to_string(), + Operator::GreaterThanEquals => ">=".to_string(), + Operator::LessThan => "<".to_string(), + Operator::LessThanEquals => "<=".to_string(), + Operator::Contains => "contains".to_string(), + Operator::NotContains => "not contains".to_string(), + } + } +} diff --git a/src/common/meta/alerts/templates.rs b/src/common/meta/alerts/templates.rs new file mode 100644 index 00000000000..a5bef713565 --- /dev/null +++ b/src/common/meta/alerts/templates.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::common::utils::json::Value; + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, ToSchema)] +pub struct Template { + #[serde(default)] + pub name: String, + #[schema(value_type = Object)] + pub body: Value, + #[serde(rename = "isDefault")] + #[serde(default)] + pub is_default: Option, +} diff --git a/src/common/meta/alerts/triggers.rs b/src/common/meta/alerts/triggers.rs new file mode 100644 index 00000000000..3612919a003 --- /dev/null +++ b/src/common/meta/alerts/triggers.rs @@ -0,0 +1,23 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct Trigger { + pub next_run_at: i64, + pub is_realtime: bool, + pub is_silenced: bool, +} diff --git a/src/common/meta/http.rs b/src/common/meta/http.rs index 8f6dd2105e2..6982ab14f41 100644 --- a/src/common/meta/http.rs +++ b/src/common/meta/http.rs @@ -67,7 +67,13 @@ impl HttpResponse { } } - /// Send a bad request response in json format and associate the + /// Send a normal response in json format and associate the + /// provided message as `message` field. + pub fn ok(msg: impl ToString) -> ActixHttpResponse { + ActixHttpResponse::Ok().json(Self::message(StatusCode::OK.into(), msg.to_string())) + } + + /// Send a BadRequest response in json format and associate the /// provided error as `error` field. pub fn bad_request(error: impl ToString) -> ActixHttpResponse { ActixHttpResponse::BadRequest().json(Self::error( @@ -76,6 +82,29 @@ impl HttpResponse { )) } + /// Send a Forbidden response in json format and associate the + /// provided error as `error` field. + pub fn forbidden(error: impl ToString) -> ActixHttpResponse { + ActixHttpResponse::Forbidden() + .json(Self::error(StatusCode::FORBIDDEN.into(), error.to_string())) + } + + /// Send a NotFound response in json format and associate the + /// provided error as `error` field. + pub fn not_found(error: impl ToString) -> ActixHttpResponse { + ActixHttpResponse::NotFound() + .json(Self::error(StatusCode::NOT_FOUND.into(), error.to_string())) + } + + /// Send a InternalServerError response in json format and associate the + /// provided error as `error` field. + pub fn internal_error(error: impl ToString) -> ActixHttpResponse { + ActixHttpResponse::InternalServerError().json(Self::error( + StatusCode::INTERNAL_SERVER_ERROR.into(), + error.to_string(), + )) + } + /// Send a response in json format, status code is 200. /// The payload should be serde-serializable. pub fn json(payload: impl Serialize) -> ActixHttpResponse { diff --git a/src/common/meta/mod.rs b/src/common/meta/mod.rs index caf2b621592..5535e2f61cd 100644 --- a/src/common/meta/mod.rs +++ b/src/common/meta/mod.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; -pub mod alert; +pub mod alerts; pub mod common; pub mod dashboards; pub mod functions; diff --git a/src/common/meta/organization.rs b/src/common/meta/organization.rs index 13a4cc813af..a7e60c5ca50 100644 --- a/src/common/meta/organization.rs +++ b/src/common/meta/organization.rs @@ -18,7 +18,7 @@ use utoipa::ToSchema; use crate::common::infra::config::CONFIG; -use super::{alert::Alert, functions::Transform, stream::Stream}; +use super::{alerts::Alert, functions::Transform, stream::Stream}; pub const DEFAULT_ORG: &str = "default"; pub const CUSTOM: &str = "custom"; diff --git a/src/common/meta/telemetry.rs b/src/common/meta/telemetry.rs index b513fa3fdfb..f997a9774a0 100644 --- a/src/common/meta/telemetry.rs +++ b/src/common/meta/telemetry.rs @@ -243,10 +243,9 @@ pub async fn add_zo_info(mut data: HashMap) -> HashMap) -> HashMap. + use chrono::Utc; use crate::common::{ @@ -5,8 +20,7 @@ use crate::common::{ config::CONFIG, db::{self, Db}, }, - meta::alert::Trigger, - utils::{file::get_file_meta, json}, + utils::file::get_file_meta, }; const ITEM_PREFIXES: [&str; 13] = [ @@ -53,14 +67,14 @@ pub async fn load_meta_from_sled() -> Result<(), anyhow::Error> { ); for (key, value) in res.iter() { - let final_key; - let key = if key.starts_with("/trigger") { - let local_val: Trigger = json::from_slice(value).unwrap(); - final_key = format!("/trigger/{}/{}", local_val.org, local_val.alert_name); - &final_key - } else { - key - }; + // let final_key; + // let key = if key.starts_with("/trigger") { + // let local_val: Trigger = json::from_slice(value).unwrap(); + // final_key = format!("/trigger/{}/{}", local_val.org, local_val.alert_name); + // &final_key + // } else { + // key + // }; match dest.put(key, value.clone(), false).await { Ok(_) => {} Err(e) => { @@ -99,14 +113,14 @@ pub async fn load_meta_from_etcd() -> Result<(), anyhow::Error> { ); let mut count = 0; for (key, value) in res.iter() { - let final_key; - let key = if key.starts_with("/trigger") { - let local_val: Trigger = json::from_slice(value).unwrap(); - final_key = format!("/trigger/{}/{}", local_val.org, local_val.alert_name); - &final_key - } else { - key - }; + // let final_key; + // let key = if key.starts_with("/trigger") { + // let local_val: Trigger = json::from_slice(value).unwrap(); + // final_key = format!("/trigger/{}/{}", local_val.org, local_val.alert_name); + // &final_key + // } else { + // key + // }; match dest.put(key, value.clone(), false).await { Ok(_) => { count += 1; diff --git a/src/common/utils/mod.rs b/src/common/utils/mod.rs index 35f81d1e916..7aaa09cec82 100644 --- a/src/common/utils/mod.rs +++ b/src/common/utils/mod.rs @@ -23,7 +23,6 @@ pub mod functions; pub mod hasher; pub mod http; pub mod json; -pub mod notification; pub mod rand; pub mod schema; pub mod schema_ext; diff --git a/src/common/utils/notification.rs b/src/common/utils/notification.rs deleted file mode 100644 index 4b042b1c4d1..00000000000 --- a/src/common/utils/notification.rs +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2023 Zinc Labs Inc. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::error::Error as StdError; - -use crate::common::meta::alert::{self, Alert}; -use crate::common::utils::json::{self, Value}; -use crate::service::db; - -pub async fn send_notification( - alert: &Alert, - trigger: &alert::Trigger, -) -> Result<(), Box> { - let alert_type = match &trigger.is_ingest_time { - true => "Real time", - false => "Scheduled", - }; - let curr_ts = chrono::Utc::now().timestamp_micros(); - let local_dest = match db::alerts::destinations::get(&trigger.org, &alert.destination).await { - Ok(v) => v, - Err(_) => { - log::error!("Destination Not found: {}", &alert.destination); - return Ok(()); - } - }; - - let body = local_dest.template.unwrap().body; - let resp_str = json::to_string(&body).unwrap(); - - let mut resp = resp_str - .replace("{stream_name}", &trigger.stream) - .replace("{org_name}", &trigger.org) - .replace("{alert_name}", &trigger.alert_name) - .replace("{alert_type}", alert_type) - .replace("{timestamp}", &curr_ts.to_string()); - - // Replace contextual information with values if any from alert - if alert.context_attributes.is_some() { - for (key, value) in alert.context_attributes.as_ref().unwrap() { - resp = resp.replace(&format!("{{{key}}}"), value) - } - } - - let msg: Value = json::from_str(&resp).unwrap(); - let msg: Value = match &msg { - Value::String(obj) => match json::from_str(obj) { - Ok(obj) => obj, - Err(_) => msg, - }, - _ => msg, - }; - let client = if local_dest.skip_tls_verify { - reqwest::Client::builder() - .danger_accept_invalid_certs(true) - .build()? - } else { - reqwest::Client::new() - }; - match url::Url::parse(&local_dest.url) { - Ok(url) => { - let mut req = match local_dest.method { - alert::AlertHTTPType::POST => client.post(url), - alert::AlertHTTPType::PUT => client.put(url), - alert::AlertHTTPType::GET => client.get(url), - } - .header("Content-type", "application/json"); - - // Add additional headers if any from destination description - if local_dest.headers.is_some() { - for (key, value) in local_dest.headers.unwrap() { - if !key.is_empty() && !value.is_empty() { - req = req.header(key, value); - } - } - }; - - let resp = req.json(&msg).send().await; - match resp { - Ok(resp) => { - if !resp.status().is_success() { - log::error!("Notification sent error: {:?}", resp.bytes().await); - } - } - Err(err) => log::error!("Notification sending error {:?}", err), - } - } - Err(err) => { - log::error!("Notification sending error {:?}", err); - } - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::common::meta::alert::{AlertDestination, Condition, DestinationTemplate, Trigger}; - use crate::common::meta::search::Query; - use crate::common::meta::StreamType; - - #[actix_web::test] - async fn test_send_notification() { - let template = DestinationTemplate { - name: Some("testTemplate".to_string()), - body: "Test Body".into(), - is_default: Some(false), - }; - let _ = db::alerts::templates::set("default", "testTemplate", template).await; - - let destination = AlertDestination { - url: "http://dummy/alert".to_string(), - method: alert::AlertHTTPType::POST, - skip_tls_verify: false, - headers: None, - template: "testTemplate".to_string(), - name: Some("test".to_string()), - }; - let _ = db::alerts::destinations::set("default", "testDest", destination).await; - - let obj: Trigger = Trigger { - timestamp: chrono::Utc::now().timestamp_micros(), - is_valid: true, - alert_name: "Test Alert".to_string(), - stream: "olympics".to_string(), - stream_type: StreamType::Logs, - org: "default".to_string(), - last_sent_at: chrono::Utc::now().timestamp_micros(), - count: 1, - is_ingest_time: true, - parent_alert_deleted: false, - }; - let alert = Alert { - name: "testAlert".to_string(), - stream: "olympics".to_string(), - stream_type: Some(StreamType::Logs), - query: Some(Query { - sql: "select * from olympics".to_string(), - from: 0, - size: 0, - start_time: 0, - end_time: 0, - sort_by: None, - sql_mode: "full".to_string(), - query_type: "".to_string(), - track_total_hits: false, - query_context: None, - uses_zo_fn: false, - query_fn: None, - }), - condition: Condition { - column: "Country".to_string(), - operator: alert::AllOperator::EqualTo, - ignore_case: Some(false), - value: json::Value::String("USA".to_string()), - is_numeric: Some(false), - }, - duration: 5, - frequency: 1, - time_between_alerts: 10, - destination: "testDest".to_string(), - is_real_time: true, - context_attributes: None, - }; - - send_notification(&alert, &obj).await.unwrap(); - } -} diff --git a/src/handler/grpc/request/metrics/querier.rs b/src/handler/grpc/request/metrics/querier.rs index 726543b7ce3..51ece640b1e 100644 --- a/src/handler/grpc/request/metrics/querier.rs +++ b/src/handler/grpc/request/metrics/querier.rs @@ -99,7 +99,7 @@ impl Metrics for Querier { } }; - let pattern = format!("{wal_dir}/files/{org_id}/metrics/{stream_name}/",); + let pattern = format!("{wal_dir}/files/{org_id}/metrics/{stream_name}/"); let files = scan_files(&pattern); if files.is_empty() { return Ok(Response::new(resp)); diff --git a/src/handler/http/request/alerts/destinations.rs b/src/handler/http/request/alerts/destinations.rs index 1065ca1fc33..def16e5729f 100644 --- a/src/handler/http/request/alerts/destinations.rs +++ b/src/handler/http/request/alerts/destinations.rs @@ -13,12 +13,13 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use actix_web::{delete, get, http, post, web, HttpResponse, Responder}; +use actix_web::{delete, get, http, post, web, HttpResponse}; use std::io::Error; -use crate::common::meta::http::HttpResponse as MetaHttpResponse; -use crate::service::db; -use crate::{common::meta::alert::AlertDestination, service::alerts::destinations}; +use crate::common::meta::{ + alerts::destinations::Destination, http::HttpResponse as MetaHttpResponse, +}; +use crate::service::alerts::destinations; /** CreateDestination */ #[utoipa::path( @@ -32,74 +33,74 @@ use crate::{common::meta::alert::AlertDestination, service::alerts::destinations ("org_id" = String, Path, description = "Organization name"), ("destination_name" = String, Path, description = "Destination name"), ), - request_body(content = AlertDestination, description = "Destination data", content_type = "application/json"), + request_body(content = Destination, description = "Destination data", content_type = "application/json"), responses( - (status = 200, description="Success", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 400, description = "Error", content_type = "application/json", body = HttpResponse), ) )] #[post("/{org_id}/alerts/destinations/{destination_name}")] pub async fn save_destination( path: web::Path<(String, String)>, - dest: web::Json, + dest: web::Json, ) -> Result { let (org_id, name) = path.into_inner(); - let dest = dest.into_inner(); - - if db::alerts::templates::get(org_id.as_str(), &dest.template) - .await - .is_err() - { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - "Please specify valid template".to_string(), - ))); + match destinations::save(&org_id, &name, dest).await { + Ok(_) => Ok(MetaHttpResponse::ok("Alert destination saved")), + Err(e) => Ok(MetaHttpResponse::bad_request(e)), } - destinations::save_destination(org_id, name, dest).await } -/** ListDestinations */ +/** GetDestination */ #[utoipa::path( context_path = "/api", tag = "Alerts", - operation_id = "ListDestinations", + operation_id = "GetDestination", security( ("Authorization"= []) ), params( ("org_id" = String, Path, description = "Organization name"), + ("destination_name" = String, Path, description = "Destination name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = Vec), + (status = 200, description = "Success", content_type = "application/json", body = Destination), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), ) )] -#[get("/{org_id}/alerts/destinations")] -async fn list_destinations(path: web::Path) -> impl Responder { - let org_id = path.into_inner(); - destinations::list_destinations(org_id).await +#[get("/{org_id}/alerts/destinations/{destination_name}")] +async fn get_destination(path: web::Path<(String, String)>) -> Result { + let (org_id, name) = path.into_inner(); + match destinations::get(&org_id, &name).await { + Ok(data) => Ok(MetaHttpResponse::json(data)), + Err(e) => Ok(MetaHttpResponse::not_found(e)), + } } -/** GetDestination */ +/** ListDestinations */ #[utoipa::path( context_path = "/api", tag = "Alerts", - operation_id = "GetDestination", + operation_id = "ListDestinations", security( ("Authorization"= []) ), params( ("org_id" = String, Path, description = "Organization name"), - ("destination_name" = String, Path, description = "Destination name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = AlertDestinationResponse), - (status = 404, description="NotFound", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = Vec), + (status = 400, description = "Error", content_type = "application/json", body = HttpResponse), ) )] -#[get("/{org_id}/alerts/destinations/{destination_name}")] -async fn get_destination(path: web::Path<(String, String)>) -> impl Responder { - let (org_id, name) = path.into_inner(); - destinations::get_destination(org_id, name).await +#[get("/{org_id}/alerts/destinations")] +async fn list_destinations(path: web::Path) -> Result { + let org_id = path.into_inner(); + match destinations::list(&org_id).await { + Ok(data) => Ok(MetaHttpResponse::json(data)), + Err(e) => Ok(MetaHttpResponse::bad_request(e)), + } } /** DeleteDestination */ @@ -115,13 +116,21 @@ async fn get_destination(path: web::Path<(String, String)>) -> impl Responder { ("destination_name" = String, Path, description = "Destination name"), ), responses( - (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), - (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), - (status = 500, description = "Error", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 403, description = "Forbidden", content_type = "application/json", body = HttpResponse), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), + (status = 500, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] #[delete("/{org_id}/alerts/destinations/{destination_name}")] -async fn delete_destination(path: web::Path<(String, String)>) -> impl Responder { +async fn delete_destination(path: web::Path<(String, String)>) -> Result { let (org_id, name) = path.into_inner(); - destinations::delete_destination(org_id, name).await + match destinations::delete(&org_id, &name).await { + Ok(_) => Ok(MetaHttpResponse::ok("Alert destination deleted")), + Err(e) => match e { + (http::StatusCode::FORBIDDEN, e) => Ok(MetaHttpResponse::forbidden(e)), + (http::StatusCode::NOT_FOUND, e) => Ok(MetaHttpResponse::not_found(e)), + (_, e) => Ok(MetaHttpResponse::internal_error(e)), + }, + } } diff --git a/src/handler/http/request/alerts/mod.rs b/src/handler/http/request/alerts/mod.rs index f20128d30ab..95cf2f6310a 100644 --- a/src/handler/http/request/alerts/mod.rs +++ b/src/handler/http/request/alerts/mod.rs @@ -13,11 +13,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use actix_web::{delete, get, http, post, put, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{delete, get, http, post, put, web, HttpRequest, HttpResponse}; use ahash::AHashMap as HashMap; use std::io::Error; -use crate::common::meta::{alert::Alert, http::HttpResponse as MetaHttpResponse}; +use crate::common::meta::{alerts::Alert, http::HttpResponse as MetaHttpResponse}; use crate::common::utils::http::get_stream_type_from_request; use crate::service::alerts; @@ -39,7 +39,8 @@ pub mod templates; ), request_body(content = Alert, description = "Alert data", content_type = "application/json"), responses( - (status = 200, description="Success", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 400, description = "Error", content_type = "application/json", body = HttpResponse), ) )] #[post("/{org_id}/{stream_name}/alerts/{alert_name}")] @@ -49,26 +50,27 @@ pub async fn save_alert( req: HttpRequest, ) -> Result { let (org_id, stream_name, name) = path.into_inner(); - let query = web::Query::>::from_query(req.query_string()).unwrap(); let stream_type = match get_stream_type_from_request(&query) { Ok(v) => v.unwrap_or_default(), Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + return Ok(MetaHttpResponse::bad_request(e)); } }; - alerts::save_alert( + match alerts::save( &org_id, - &stream_name, stream_type, + &stream_name, &name, alert.into_inner(), ) .await + { + Ok(_) => Ok(MetaHttpResponse::ok("Alert saved")), + Err(e) => Ok(MetaHttpResponse::bad_request(e)), + } } + /** ListStreamAlerts */ #[utoipa::path( context_path = "/api", @@ -82,23 +84,31 @@ pub async fn save_alert( ("stream_name" = String, Path, description = "Stream name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = AlertList), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 400, description = "Error", content_type = "application/json", body = HttpResponse), ) )] #[get("/{org_id}/{stream_name}/alerts")] -async fn list_stream_alerts(path: web::Path<(String, String)>, req: HttpRequest) -> impl Responder { +async fn list_stream_alerts( + path: web::Path<(String, String)>, + req: HttpRequest, +) -> Result { let (org_id, stream_name) = path.into_inner(); let query = web::Query::>::from_query(req.query_string()).unwrap(); let stream_type = match get_stream_type_from_request(&query) { Ok(v) => v, Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + return Ok(MetaHttpResponse::bad_request(e)); } }; - alerts::list_alert(&org_id, Some(stream_name.as_str()), stream_type).await + match alerts::list(&org_id, stream_type, Some(stream_name.as_str())).await { + Ok(data) => { + let mut mapdata = HashMap::new(); + mapdata.insert("list", data); + Ok(MetaHttpResponse::json(mapdata)) + } + Err(e) => Ok(MetaHttpResponse::bad_request(e)), + } } /** ListAlerts */ @@ -113,23 +123,20 @@ async fn list_stream_alerts(path: web::Path<(String, String)>, req: HttpRequest) ("org_id" = String, Path, description = "Organization name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = AlertList), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), ) )] #[get("/{org_id}/alerts")] -async fn list_alerts(path: web::Path, req: HttpRequest) -> impl Responder { +async fn list_alerts(path: web::Path) -> Result { let org_id = path.into_inner(); - let query = web::Query::>::from_query(req.query_string()).unwrap(); - let stream_type = match get_stream_type_from_request(&query) { - Ok(v) => v, - Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + match alerts::list(&org_id, None, None).await { + Ok(data) => { + let mut mapdata = HashMap::new(); + mapdata.insert("list", data); + Ok(MetaHttpResponse::json(mapdata)) } - }; - alerts::list_alert(&org_id, None, stream_type).await + Err(e) => Ok(MetaHttpResponse::bad_request(e)), + } } /** GetAlertByName */ @@ -146,24 +153,27 @@ async fn list_alerts(path: web::Path, req: HttpRequest) -> impl Responde ("alert_name" = String, Path, description = "Alert name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = Alert), - (status = 404, description="NotFound", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = Alert), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), ) )] #[get("/{org_id}/{stream_name}/alerts/{alert_name}")] -async fn get_alert(path: web::Path<(String, String, String)>, req: HttpRequest) -> impl Responder { +async fn get_alert( + path: web::Path<(String, String, String)>, + req: HttpRequest, +) -> Result { let (org_id, stream_name, name) = path.into_inner(); let query = web::Query::>::from_query(req.query_string()).unwrap(); let stream_type = match get_stream_type_from_request(&query) { Ok(v) => v.unwrap_or_default(), Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + return Ok(MetaHttpResponse::bad_request(e)); } }; - alerts::get_alert(&org_id, &stream_name, stream_type, &name).await + match alerts::get(&org_id, stream_type, &stream_name, &name).await { + Ok(data) => Ok(MetaHttpResponse::json(data)), + Err(e) => Ok(MetaHttpResponse::not_found(e)), + } } /** DeleteAlert */ @@ -180,28 +190,79 @@ async fn get_alert(path: web::Path<(String, String, String)>, req: HttpRequest) ("alert_name" = String, Path, description = "Alert name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = HttpResponse), - (status = 404, description="NotFound", content_type = "application/json", body = HttpResponse), - (status = 500, description="Error", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), + (status = 500, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] #[delete("/{org_id}/{stream_name}/alerts/{alert_name}")] async fn delete_alert( path: web::Path<(String, String, String)>, req: HttpRequest, -) -> impl Responder { +) -> Result { + let (org_id, stream_name, name) = path.into_inner(); + let query = web::Query::>::from_query(req.query_string()).unwrap(); + let stream_type = match get_stream_type_from_request(&query) { + Ok(v) => v.unwrap_or_default(), + Err(e) => { + return Ok(MetaHttpResponse::bad_request(e)); + } + }; + match alerts::delete(&org_id, stream_type, &stream_name, &name).await { + Ok(_) => Ok(MetaHttpResponse::ok("Alert deleted")), + Err(e) => match e { + (http::StatusCode::NOT_FOUND, e) => Ok(MetaHttpResponse::not_found(e)), + (_, e) => Ok(MetaHttpResponse::internal_error(e)), + }, + } +} + +/** EnableAlert */ +#[utoipa::path( + context_path = "/api", + tag = "Alerts", + operation_id = "EnableAlert", + security( + ("Authorization"= []) + ), + params( + ("org_id" = String, Path, description = "Organization name"), + ("stream_name" = String, Path, description = "Stream name"), + ("alert_name" = String, Path, description = "Alert name"), + ("value" = bool, Query, description = "Enable or disable alert"), + ), + responses( + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), + (status = 500, description = "Failure", content_type = "application/json", body = HttpResponse), + ) +)] +#[put("/{org_id}/{stream_name}/alerts/{alert_name}/enable")] +async fn enable_alert( + path: web::Path<(String, String, String)>, + req: HttpRequest, +) -> Result { let (org_id, stream_name, name) = path.into_inner(); let query = web::Query::>::from_query(req.query_string()).unwrap(); let stream_type = match get_stream_type_from_request(&query) { Ok(v) => v.unwrap_or_default(), Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + return Ok(MetaHttpResponse::bad_request(e)); } }; - alerts::delete_alert(&org_id, &stream_name, stream_type, &name).await + let enable = match query.get("value") { + Some(v) => v.parse::().unwrap_or_default(), + None => false, + }; + let mut resp = HashMap::new(); + resp.insert("enabled".to_string(), enable); + match alerts::enable(&org_id, stream_type, &stream_name, &name, enable).await { + Ok(_) => Ok(MetaHttpResponse::json(resp)), + Err(e) => match e { + (http::StatusCode::NOT_FOUND, e) => Ok(MetaHttpResponse::not_found(e)), + (_, e) => Ok(MetaHttpResponse::internal_error(e)), + }, + } } /** TriggerAlert */ @@ -218,25 +279,29 @@ async fn delete_alert( ("alert_name" = String, Path, description = "Alert name"), ), responses( - (status = 200, description="Success", content_type = "application/json", body = HttpResponse), - (status = 404, description="NotFound", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), + (status = 500, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] #[put("/{org_id}/{stream_name}/alerts/{alert_name}/trigger")] async fn trigger_alert( path: web::Path<(String, String, String)>, req: HttpRequest, -) -> impl Responder { +) -> Result { let (org_id, stream_name, name) = path.into_inner(); let query = web::Query::>::from_query(req.query_string()).unwrap(); let stream_type = match get_stream_type_from_request(&query) { Ok(v) => v.unwrap_or_default(), Err(e) => { - return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( - http::StatusCode::BAD_REQUEST.into(), - e.to_string(), - ))) + return Ok(MetaHttpResponse::bad_request(e)); } }; - alerts::trigger_alert(&org_id, &stream_name, stream_type, &name).await + match alerts::trigger(&org_id, stream_type, &stream_name, &name).await { + Ok(_) => Ok(MetaHttpResponse::ok("Alert triggered")), + Err(e) => match e { + (http::StatusCode::NOT_FOUND, e) => Ok(MetaHttpResponse::not_found(e)), + (_, e) => Ok(MetaHttpResponse::internal_error(e)), + }, + } } diff --git a/src/handler/http/request/alerts/templates.rs b/src/handler/http/request/alerts/templates.rs index f2b64f20c56..b84fbf5d124 100644 --- a/src/handler/http/request/alerts/templates.rs +++ b/src/handler/http/request/alerts/templates.rs @@ -13,10 +13,11 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use actix_web::{delete, get, post, web, HttpResponse, Responder}; +use actix_web::{delete, get, http, post, web, HttpResponse}; use std::io::Error; -use crate::{common::meta::alert::DestinationTemplate, service::alerts::templates}; +use crate::common::meta::{alerts::templates::Template, http::HttpResponse as MetaHttpResponse}; +use crate::service::alerts::templates; /** CreateTemplate */ #[utoipa::path( @@ -30,62 +31,74 @@ use crate::{common::meta::alert::DestinationTemplate, service::alerts::templates ("org_id" = String, Path, description = "Organization name"), ("template_name" = String, Path, description = "Template name"), ), - request_body(content = DestinationTemplate, description = "Template data", content_type = "application/json"), + request_body(content = Template, description = "Template data", content_type = "application/json"), responses( - (status = 200, description="Success", content_type = "application/json", body = HttpResponse), + (status = 200, description = "Success", content_type = "application/json", body = HttpResponse), + (status = 400, description = "Error", content_type = "application/json", body = HttpResponse), ) )] #[post("/{org_id}/alerts/templates/{template_name}")] pub async fn save_template( path: web::Path<(String, String)>, - alert: web::Json, + tmpl: web::Json