Skip to content

Commit 22901bf

Browse files
authored
chore: hotfix for alerts (#1283)
- Added helper methods to fetch base query, eval details - Ensured DataFusion accepts columns and table names as is - Removed clockwerk scheduler - Modifed alert config JSON - Modified triggered alert message
1 parent f79a495 commit 22901bf

File tree

9 files changed

+541
-417
lines changed

9 files changed

+541
-417
lines changed

.github/workflows/coverage.yaml

+28-2
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,35 @@ jobs:
6262
exit 1
6363
fi
6464
65+
- name: Find and fix librdkafka CMakeLists.txt
66+
run: |
67+
# Download the package first so it's in the registry
68+
cargo fetch
69+
70+
# Find the rdkafka-sys package directory
71+
RDKAFKA_SYS_DIR=$(find ~/.cargo/registry/src -name "rdkafka-sys-*" -type d | head -n 1)
72+
echo "Found rdkafka-sys at: $RDKAFKA_SYS_DIR"
73+
74+
# Find the librdkafka CMakeLists.txt file
75+
CMAKE_FILE="$RDKAFKA_SYS_DIR/librdkafka/CMakeLists.txt"
76+
77+
if [ -f "$CMAKE_FILE" ]; then
78+
echo "Found CMakeLists.txt at: $CMAKE_FILE"
79+
80+
# Make a backup of the original file
81+
cp "$CMAKE_FILE" "$CMAKE_FILE.bak"
82+
83+
# Replace the minimum required version
84+
sed -i 's/cmake_minimum_required(VERSION 3.2)/cmake_minimum_required(VERSION 3.5)/' "$CMAKE_FILE"
85+
86+
echo "Modified CMakeLists.txt - before and after comparison:"
87+
diff "$CMAKE_FILE.bak" "$CMAKE_FILE" || true
88+
else
89+
echo "Could not find librdkafka CMakeLists.txt file!"
90+
exit 1
91+
fi
92+
6593
- name: Check with clippy
66-
env:
67-
CMAKE_FLAGS: "-DCMAKE_POLICY_VERSION_MINIMUM=3.5"
6894
run: cargo hack clippy --verbose --each-feature --no-dev-deps -- -D warnings
6995

7096
- name: Test default feature set

src/alerts/alerts_utils.rs

+117-57
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use datafusion::{
2525
min_max::{max, min},
2626
sum::sum,
2727
},
28+
logical_expr::{BinaryExpr, Literal, Operator},
2829
prelude::{col, lit, DataFrame, Expr},
2930
};
3031
use tracing::trace;
3132

3233
use crate::{
33-
alerts::AggregateCondition,
34+
alerts::LogicalOperator,
3435
parseable::PARSEABLE,
3536
query::{TableScanVisitor, QUERY_SESSION},
3637
rbac::{
@@ -42,8 +43,8 @@ use crate::{
4243
};
4344

4445
use super::{
45-
AggregateConfig, AggregateOperation, AggregateResult, Aggregations, AlertConfig, AlertError,
46-
AlertOperator, AlertState, ConditionConfig, Conditions, ALERTS,
46+
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError,
47+
AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS,
4748
};
4849

4950
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
@@ -102,23 +103,23 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
102103
trace!("RUNNING EVAL TASK FOR- {alert:?}");
103104

104105
let query = prepare_query(alert).await?;
105-
let base_df = execute_base_query(&query, &alert.query).await?;
106-
let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?;
107-
let final_res = calculate_final_result(&alert.aggregate_config, &agg_results);
106+
let select_query = alert.get_base_query();
107+
let base_df = execute_base_query(&query, &select_query).await?;
108+
let agg_results = evaluate_aggregates(&alert.aggregates, &base_df).await?;
109+
let final_res = calculate_final_result(&alert.aggregates, &agg_results);
108110

109111
update_alert_state(alert, final_res, &agg_results).await?;
110112
Ok(())
111113
}
112114

113115
async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, AlertError> {
114-
let (start_time, end_time) = match &alert.eval_type {
115-
super::EvalConfig::RollingWindow(rolling_window) => {
116-
(&rolling_window.eval_start, &rolling_window.eval_end)
117-
}
116+
let (start_time, end_time) = match &alert.eval_config {
117+
super::EvalConfig::RollingWindow(rolling_window) => (&rolling_window.eval_start, "now"),
118118
};
119119

120120
let session_state = QUERY_SESSION.state();
121-
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;
121+
let select_query = alert.get_base_query();
122+
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
122123

123124
let time_range = TimeRange::parse_human_time(start_time, end_time)
124125
.map_err(|err| AlertError::CustomError(err.to_string()))?;
@@ -146,15 +147,15 @@ async fn execute_base_query(
146147
}
147148

148149
async fn evaluate_aggregates(
149-
agg_config: &Aggregations,
150+
agg_config: &Aggregates,
150151
base_df: &DataFrame,
151152
) -> Result<Vec<AggregateResult>, AlertError> {
152153
let agg_filter_exprs = get_exprs(agg_config);
153154
let mut results = Vec::new();
154155

155156
let conditions = match &agg_config.operator {
156-
Some(_) => &agg_config.aggregate_conditions[0..2],
157-
None => &agg_config.aggregate_conditions[0..1],
157+
Some(_) => &agg_config.aggregate_config[0..2],
158+
None => &agg_config.aggregate_config[0..1],
158159
};
159160

160161
for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip(conditions) {
@@ -186,10 +187,10 @@ async fn evaluate_single_aggregate(
186187
let result = evaluate_condition(&agg.operator, final_value, agg.value);
187188

188189
let message = if result {
189-
agg.condition_config
190+
agg.conditions
190191
.as_ref()
191192
.map(|config| config.generate_filter_message())
192-
.or(Some(String::default()))
193+
.or(None)
193194
} else {
194195
None
195196
};
@@ -206,18 +207,17 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b
206207
match operator {
207208
AlertOperator::GreaterThan => actual > expected,
208209
AlertOperator::LessThan => actual < expected,
209-
AlertOperator::EqualTo => actual == expected,
210-
AlertOperator::NotEqualTo => actual != expected,
211-
AlertOperator::GreaterThanEqualTo => actual >= expected,
212-
AlertOperator::LessThanEqualTo => actual <= expected,
213-
_ => unreachable!(),
210+
AlertOperator::Equal => actual == expected,
211+
AlertOperator::NotEqual => actual != expected,
212+
AlertOperator::GreaterThanOrEqual => actual >= expected,
213+
AlertOperator::LessThanOrEqual => actual <= expected,
214214
}
215215
}
216216

217-
fn calculate_final_result(agg_config: &Aggregations, results: &[AggregateResult]) -> bool {
217+
fn calculate_final_result(agg_config: &Aggregates, results: &[AggregateResult]) -> bool {
218218
match &agg_config.operator {
219-
Some(AggregateCondition::And) => results.iter().all(|r| r.result),
220-
Some(AggregateCondition::Or) => results.iter().any(|r| r.result),
219+
Some(LogicalOperator::And) => results.iter().all(|r| r.result),
220+
Some(LogicalOperator::Or) => results.iter().any(|r| r.result),
221221
None => results.first().is_some_and(|r| r.result),
222222
}
223223
}
@@ -228,8 +228,12 @@ async fn update_alert_state(
228228
agg_results: &[AggregateResult],
229229
) -> Result<(), AlertError> {
230230
if final_res {
231-
trace!("ALERT!!!!!!");
232231
let message = format_alert_message(agg_results);
232+
let message = format!(
233+
"{message}\nEvaluation Window: {}\nEvaluation Frequency: {}m",
234+
alert.get_eval_window(),
235+
alert.get_eval_frequency()
236+
);
233237
ALERTS
234238
.update_state(alert.id, AlertState::Triggered, Some(message))
235239
.await
@@ -249,8 +253,8 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
249253
for result in agg_results {
250254
if let Some(msg) = &result.message {
251255
message.extend([format!(
252-
"|{}({}) WHERE ({}) {} {} (ActualValue: {})|",
253-
result.config.agg,
256+
"\nCondition: {}({}) WHERE ({}) {} {}\nActualValue: {}\n",
257+
result.config.aggregate_function,
254258
result.config.column,
255259
msg,
256260
result.config.operator,
@@ -259,8 +263,8 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
259263
)]);
260264
} else {
261265
message.extend([format!(
262-
"|{}({}) {} {} (ActualValue: {})",
263-
result.config.agg,
266+
"\nCondition: {}({}) {} {}\nActualValue: {}\n",
267+
result.config.aggregate_function,
264268
result.config.column,
265269
result.config.operator,
266270
result.config.value,
@@ -305,17 +309,17 @@ fn get_final_value(aggregated_rows: Vec<RecordBatch>) -> f64 {
305309
/// returns a tuple of (aggregate expressions, filter expressions)
306310
///
307311
/// It calls get_filter_expr() to get filter expressions
308-
fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
312+
fn get_exprs(aggregate_config: &Aggregates) -> Vec<(Expr, Option<Expr>)> {
309313
let mut agg_expr = Vec::new();
310314

311315
match &aggregate_config.operator {
312316
Some(op) => match op {
313-
AggregateCondition::And | AggregateCondition::Or => {
314-
let agg1 = &aggregate_config.aggregate_conditions[0];
315-
let agg2 = &aggregate_config.aggregate_conditions[1];
317+
LogicalOperator::And | LogicalOperator::Or => {
318+
let agg1 = &aggregate_config.aggregate_config[0];
319+
let agg2 = &aggregate_config.aggregate_config[1];
316320

317321
for agg in [agg1, agg2] {
318-
let filter_expr = if let Some(where_clause) = &agg.condition_config {
322+
let filter_expr = if let Some(where_clause) = &agg.conditions {
319323
let fe = get_filter_expr(where_clause);
320324

321325
trace!("filter_expr-\n{fe:?}");
@@ -331,9 +335,9 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
331335
}
332336
},
333337
None => {
334-
let agg = &aggregate_config.aggregate_conditions[0];
338+
let agg = &aggregate_config.aggregate_config[0];
335339

336-
let filter_expr = if let Some(where_clause) = &agg.condition_config {
340+
let filter_expr = if let Some(where_clause) = &agg.conditions {
337341
let fe = get_filter_expr(where_clause);
338342

339343
trace!("filter_expr-\n{fe:?}");
@@ -353,23 +357,23 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
353357
fn get_filter_expr(where_clause: &Conditions) -> Expr {
354358
match &where_clause.operator {
355359
Some(op) => match op {
356-
AggregateCondition::And => {
360+
LogicalOperator::And => {
357361
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
358362

359-
let expr1 = &where_clause.conditions[0];
360-
let expr2 = &where_clause.conditions[1];
363+
let expr1 = &where_clause.condition_config[0];
364+
let expr2 = &where_clause.condition_config[1];
361365

362366
for e in [expr1, expr2] {
363367
let ex = match_alert_operator(e);
364368
expr = expr.and(ex);
365369
}
366370
expr
367371
}
368-
AggregateCondition::Or => {
372+
LogicalOperator::Or => {
369373
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(false)));
370374

371-
let expr1 = &where_clause.conditions[0];
372-
let expr2 = &where_clause.conditions[1];
375+
let expr1 = &where_clause.condition_config[0];
376+
let expr2 = &where_clause.condition_config[1];
373377

374378
for e in [expr1, expr2] {
375379
let ex = match_alert_operator(e);
@@ -379,30 +383,86 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
379383
}
380384
},
381385
None => {
382-
let expr = &where_clause.conditions[0];
386+
let expr = &where_clause.condition_config[0];
383387
match_alert_operator(expr)
384388
}
385389
}
386390
}
387391

388392
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
393+
// the form accepts value as a string
394+
// if it can be parsed as a number, then parse it
395+
// else keep it as a string
396+
let value = NumberOrString::from_string(expr.value.clone());
397+
398+
// for maintaining column case
399+
let column = format!(r#""{}""#, expr.column);
389400
match expr.operator {
390-
AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
391-
AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
392-
AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)),
393-
AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)),
394-
AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)),
395-
AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)),
396-
AlertOperator::Like => col(&expr.column).like(lit(&expr.value)),
397-
AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)),
401+
WhereConfigOperator::Equal => col(column).eq(lit(value)),
402+
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
403+
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
404+
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
405+
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
406+
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
407+
WhereConfigOperator::IsNull => col(column).is_null(),
408+
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
409+
WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)),
410+
WhereConfigOperator::Contains => col(column).like(lit(&expr.value)),
411+
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
412+
Box::new(col(column)),
413+
Operator::RegexIMatch,
414+
Box::new(lit(format!("^{}", expr.value))),
415+
)),
416+
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
417+
Box::new(col(column)),
418+
Operator::RegexIMatch,
419+
Box::new(lit(format!("{}$", expr.value))),
420+
)),
421+
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)),
422+
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
423+
Box::new(col(column)),
424+
Operator::RegexNotIMatch,
425+
Box::new(lit(format!("^{}", expr.value))),
426+
)),
427+
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
428+
Box::new(col(column)),
429+
Operator::RegexNotIMatch,
430+
Box::new(lit(format!("{}$", expr.value))),
431+
)),
398432
}
399433
}
434+
400435
fn match_aggregate_operation(agg: &AggregateConfig) -> Expr {
401-
match agg.agg {
402-
AggregateOperation::Avg => avg(col(&agg.column)),
403-
AggregateOperation::Count => count(col(&agg.column)),
404-
AggregateOperation::Min => min(col(&agg.column)),
405-
AggregateOperation::Max => max(col(&agg.column)),
406-
AggregateOperation::Sum => sum(col(&agg.column)),
436+
// for maintaining column case
437+
let column = format!(r#""{}""#, agg.column);
438+
match agg.aggregate_function {
439+
AggregateFunction::Avg => avg(col(column)),
440+
AggregateFunction::Count => count(col(column)),
441+
AggregateFunction::Min => min(col(column)),
442+
AggregateFunction::Max => max(col(column)),
443+
AggregateFunction::Sum => sum(col(column)),
444+
}
445+
}
446+
447+
enum NumberOrString {
448+
Number(f64),
449+
String(String),
450+
}
451+
452+
impl Literal for NumberOrString {
453+
fn lit(&self) -> Expr {
454+
match self {
455+
NumberOrString::Number(expr) => lit(*expr),
456+
NumberOrString::String(expr) => lit(expr.clone()),
457+
}
458+
}
459+
}
460+
impl NumberOrString {
461+
fn from_string(value: String) -> Self {
462+
if let Ok(num) = value.parse::<f64>() {
463+
NumberOrString::Number(num)
464+
} else {
465+
NumberOrString::String(value)
466+
}
407467
}
408468
}

0 commit comments

Comments
 (0)