Skip to content

Commit

Permalink
[WIP] upload classification results to rockset with metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
PaliC committed Oct 17, 2023
1 parent a36efe9 commit f78323d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
53 changes: 36 additions & 17 deletions aws/lambda/log-classifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,34 @@ use log_classifier::rule_match::SerializedMatch;

struct ShouldWriteDynamo(bool);

async fn process_match(
best_match: &Match ,
log: &Log,
repo: &str,
job_id: usize,
should_write_dynamo: ShouldWriteDynamo,
) -> Result<String> {
let match_json = SerializedMatch::new(best_match, log, None);
let body = serde_json::to_string_pretty(&match_json)?;
info!("match: {}", body);
if should_write_dynamo.0 {
let client = get_dynamo_client().await;
upload_classification_dynamo(&client, repo, job_id, &match_json).await?;
}
Ok(body)
}

async fn handle(
job_id: usize,
repo: &str,
should_write_dynamo: ShouldWriteDynamo,
) -> Result<String> {
) -> Vec<Result<String>> {
let client = get_s3_client().await;
// Download the log from S3.
let start = Instant::now();
let raw_log = download_log(&client, repo, job_id).await?;
info!("download: {:?}", start.elapsed());
let mut results: Vec<String> = Vec::new();

// Do some preprocessing.
let start = Instant::now();
Expand All @@ -34,25 +52,26 @@ async fn handle(
// Run the matching
let start = Instant::now();
let ruleset = RuleSet::new_from_config();
let maybe_match = evaluate_ruleset_by_priority(&ruleset, &log);
let maybe_match_priority = evaluate_ruleset_by_priority(&ruleset, &log);
let maybe_match_position = evaluate_ruleset_by_position(&ruleset, &log);
info!("evaluate: {:?}", start.elapsed());

match maybe_match {
Some(best_match) => {
let match_json = SerializedMatch::new(&best_match, &log);
let body = serde_json::to_string_pretty(&match_json)?;
info!("match: {}", body);
if should_write_dynamo.0 {
let client = get_dynamo_client().await;
upload_classification_dynamo(&client, repo, job_id, &match_json).await?;
}
Ok(body)
}
None => {
info!("no match found for {}", job_id);
Ok("No match found".into())
}
if let Some(best_match) = maybe_match_priority {
let res = process_match(&best_match, &log, repo, job_id, should_write_dynamo).await?;
results.push(res);
}

if let Some(best_match) = maybe_match_position {
let res = process_match(&best_match, &log, repo, job_id, should_write_dynamo).await?;
results.push(res);
}

if results.is_empty() {
info!("no match found for {}", job_id);
results.push("No match found".into());
}

Ok(results)
}

async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
Expand Down
5 changes: 4 additions & 1 deletion aws/lambda/log-classifier/src/rule_match.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::log::Log;
use crate::rule::Rule;
use serde::Serialize;
use std::collections::HashMap;

/// Represents a successful match of a log line against a rule.
#[derive(Debug)]
Expand All @@ -19,17 +20,19 @@ pub struct SerializedMatch {
line: String,
line_num: usize,
captures: Vec<String>,
classifier_metadata: HashMap<String, String>,
}

impl SerializedMatch {
pub fn new(m: &Match, log: &Log) -> SerializedMatch {
pub fn new(m: &Match, log: &Log, classifier_metadata: Option<HashMap<String, String>>) -> SerializedMatch {
// Unwrap because we know this is a valid key (since the Log object is never mutated.)
let line = log.lines.get(&m.line_number).unwrap();
SerializedMatch {
rule: m.rule.name.clone(),
line: line.clone(),
line_num: m.line_number,
captures: m.captures.clone(),
classifier_metadata: classifier_metadata.unwrap_or_else(HashMap::new),
}
}
}

0 comments on commit f78323d

Please sign in to comment.