Skip to content

Commit

Permalink
upload classification results to rockset with metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
PaliC committed Oct 19, 2023
1 parent 51657c1 commit 95c5eb6
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 24 deletions.
81 changes: 58 additions & 23 deletions aws/lambda/log-classifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,49 @@ use anyhow::Result;
use std::time::Instant;
use tracing::info;

use log_classifier::engine::evaluate_ruleset_by_position;
use log_classifier::engine::evaluate_ruleset_by_priority;
use log_classifier::engine::{evaluate_ruleset_by_position, evaluate_ruleset_by_priority};
use log_classifier::log::Log;
use log_classifier::network::{
download_log, get_dynamo_client, get_s3_client, upload_classification_dynamo,
};
use log_classifier::rule::RuleSet;
use log_classifier::rule_match::SerializedMatch;
use log_classifier::rule_match::{Match, SerializedMatch};

struct ShouldWriteDynamo(bool);

/// Set the default depth of the context stack
static CONTEXT_DEPTH: &str = "12";

async fn process_match(
best_match: &Match,
log: &Log,
repo: &str,
job_id: usize,
should_write_dynamo: &ShouldWriteDynamo,
context_depth: usize,
) -> Result<String> {
let match_json = SerializedMatch::new(best_match, log, None, context_depth);
let body = serde_json::to_string_pretty(&match_json)?;
info!("match: {}", body);
if should_write_dynamo.0 {
let client = get_dynamo_client().await;
upload_classification_dynamo(&client, repo, job_id, &match_json).await?;
}
Ok(body)
}

async fn handle(
job_id: usize,
repo: &str,
should_write_dynamo: ShouldWriteDynamo,
context_depth: usize,
) -> Result<String> {
) -> Result<Vec<String>> {
let client = get_s3_client().await;
// Download the log from S3.
let start = Instant::now();
let raw_log = download_log(&client, repo, job_id).await?;
info!("download: {:?}", start.elapsed());
let mut results: Vec<String> = Vec::new();

// Do some preprocessing.
let start = Instant::now();
Expand All @@ -38,25 +56,41 @@ async fn handle(
// Run the matching
let start = Instant::now();
let ruleset = RuleSet::new_from_config();
let maybe_match = evaluate_ruleset_by_priority(&ruleset, &log);
let maybe_match_priority = evaluate_ruleset_by_priority(&ruleset, &log);
let maybe_match_position = evaluate_ruleset_by_position(&ruleset, &log);
info!("evaluate: {:?}", start.elapsed());
if let Some(best_match) = maybe_match_priority {
let res = process_match(
&best_match,
&log,
repo,
job_id,
&should_write_dynamo,
context_depth,
)
.await?;
results.push(res);
}

match maybe_match {
Some(best_match) => {
let match_json = SerializedMatch::new(&best_match, &log, context_depth);
let body = serde_json::to_string_pretty(&match_json)?;
info!("match: {}", body);
if should_write_dynamo.0 {
let client = get_dynamo_client().await;
upload_classification_dynamo(&client, repo, job_id, &match_json).await?;
}
Ok(body)
}
None => {
info!("no match found for {}", job_id);
Ok("No match found".into())
}
if let Some(best_match) = maybe_match_position {
let res = process_match(
&best_match,
&log,
repo,
job_id,
&should_write_dynamo,
context_depth,
)
.await?;
results.push(res);
}

if results.is_empty() {
info!("no match found for {}", job_id);
results.push("No match found".into());
}

Ok(results)
}

async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
Expand All @@ -72,8 +106,9 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
.first("context_depth")
.unwrap_or_else(|| CONTEXT_DEPTH)
.parse::<usize>()?;
handle(job_id, repo, ShouldWriteDynamo(true), context_depth)
.await?
let results = handle(job_id, repo, ShouldWriteDynamo(true), context_depth).await?;
serde_json::to_string(&results)
.unwrap()
.into_response()
.await
}
Expand Down Expand Up @@ -247,7 +282,7 @@ mod test {
let match_ = evaluate_ruleset_by_priority(&ruleset, &log).unwrap();
assert_eq!(match_.line_number, 4);

let match_json = SerializedMatch::new(&match_, &log, 12);
let match_json = SerializedMatch::new(&match_, &log, None, 12);
assert_eq!(
match_json.context,
["++ exit 1", "++ echo DUMMY", "+ python testing"]
Expand Down
10 changes: 9 additions & 1 deletion aws/lambda/log-classifier/src/rule_match.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::log::Log;
use crate::rule::Rule;
use serde::Serialize;
use std::collections::HashMap;

/// Represents a successful match of a log line against a rule.
#[derive(Debug)]
Expand All @@ -19,13 +20,19 @@ pub struct SerializedMatch {
line: String,
line_num: usize,
captures: Vec<String>,
classifier_metadata: HashMap<String, String>,
/// The optional context where this failure occurs. This is a free-form
/// stack of strings that includes the last commands before the failure
pub context: Vec<String>,
}

impl SerializedMatch {
pub fn new(m: &Match, log: &Log, context_depth: usize) -> SerializedMatch {
pub fn new(
m: &Match,
log: &Log,
classifier_metadata: Option<HashMap<String, String>>,
context_depth: usize,
) -> SerializedMatch {
// Unwrap because we know this is a valid key (since the Log object is never mutated.)
let line = log.lines.get(&m.line_number).unwrap();

Expand All @@ -50,6 +57,7 @@ impl SerializedMatch {
line: line.clone(),
line_num: m.line_number,
captures: m.captures.clone(),
classifier_metadata: classifier_metadata.unwrap_or_else(HashMap::new),
context: context.clone(),
}
}
Expand Down

0 comments on commit 95c5eb6

Please sign in to comment.