From 83b7755503caffdbf6e708d5537a5721e271fad2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Sun, 19 Dec 2021 18:17:03 +0100 Subject: [PATCH] feat(receiver-mock): get metadata from X-Sumo-* headers We only got metadata from X-Sumo-Fields, this commit adds X-Sumo-Name, X-Sumo-Host and X-Sumo-Category. This covers all the headers supported by Sumo's HTTP sources. --- src/rust/receiver-mock/src/logs.rs | 5 +- src/rust/receiver-mock/src/main.rs | 4 +- src/rust/receiver-mock/src/metadata.rs | 79 ++++++++++++++++++++++++ src/rust/receiver-mock/src/metrics.rs | 1 - src/rust/receiver-mock/src/router/mod.rs | 70 +++++++++++++++++---- 5 files changed, 142 insertions(+), 17 deletions(-) diff --git a/src/rust/receiver-mock/src/logs.rs b/src/rust/receiver-mock/src/logs.rs index e99b0984..30eda17b 100644 --- a/src/rust/receiver-mock/src/logs.rs +++ b/src/rust/receiver-mock/src/logs.rs @@ -223,7 +223,10 @@ mod tests { ])), ]; let body = "{\"log\": \"Log message\", \"timestamp\": 1}"; - let raw_logs = metadata.iter().map(|mt| (body.to_string(), mt.to_owned())).collect(); + let raw_logs = metadata + .iter() + .map(|mt| (body.to_string(), mt.to_owned())) + .collect(); let repository = LogRepository::from_raw_logs(raw_logs).unwrap(); assert_eq!( diff --git a/src/rust/receiver-mock/src/main.rs b/src/rust/receiver-mock/src/main.rs index e91c85b3..d251b0b2 100644 --- a/src/rust/receiver-mock/src/main.rs +++ b/src/rust/receiver-mock/src/main.rs @@ -145,8 +145,8 @@ async fn run_app(hostname: String, port: u16, opts: Options) -> std::io::Result< .route( "/collector/heartbeat", web::post().to(router::api::v1::handler_collector_heartbeat), - ) - ) + ), + ) .service( web::scope("/terraform") .app_data(app_metadata.clone()) diff --git a/src/rust/receiver-mock/src/metadata.rs b/src/rust/receiver-mock/src/metadata.rs index 1f71c34a..dce82e4b 100644 --- a/src/rust/receiver-mock/src/metadata.rs +++ b/src/rust/receiver-mock/src/metadata.rs @@ -1,8 +1,45 @@ +use actix_web::http::header::HeaderMap; use anyhow::anyhow; use std::collections::HashMap; pub type Metadata = HashMap; +// Get the metadata from Sumo's common http headers +// This does not include X-Sumo-Fields, which only applies to logs and is handled separately +pub fn get_common_metadata_from_headers(headers: &HeaderMap) -> Result { + let mut metadata = Metadata::new(); + + // these metadata field names follow what Sumo itself does + // see: https://help.sumologic.com/05Search/Get-Started-with-Search/Search-Basics/Built-in-Metadata#built-in-metadata-fields + // and: https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Data-to-an-HTTP-Source#supported-http-headers + + match headers.get("x-sumo-name") { + Some(header_value) => match header_value.to_str() { + Ok(header_value_str) => metadata.insert("_sourceName".to_string(), header_value_str.to_string()), + Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Name header value")), + }, + None => None, + }; + + match headers.get("x-sumo-host") { + Some(header_value) => match header_value.to_str() { + Ok(header_value_str) => metadata.insert("_sourceHost".to_string(), header_value_str.to_string()), + Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Host header value")), + }, + None => None, + }; + + match headers.get("x-sumo-category") { + Some(header_value) => match header_value.to_str() { + Ok(header_value_str) => metadata.insert("_sourceCategory".to_string(), header_value_str.to_string()), + Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Category header value")), + }, + None => None, + }; + + return Ok(metadata); +} + /// Parse the value of the X-Sumo-Fields header into a map of field name to field value pub fn parse_sumo_fields_header_value(header_value: &str) -> Result { let mut field_values = HashMap::new(); @@ -21,6 +58,7 @@ pub fn parse_sumo_fields_header_value(header_value: &str) -> Result panic!("Expected error, got valid result"), + Err(error) => assert_eq!(error.to_string(), "Couldn't parse X-Sumo-Name header value"), + } + } } diff --git a/src/rust/receiver-mock/src/metrics.rs b/src/rust/receiver-mock/src/metrics.rs index 7d59ed7b..beb2b569 100644 --- a/src/rust/receiver-mock/src/metrics.rs +++ b/src/rust/receiver-mock/src/metrics.rs @@ -375,4 +375,3 @@ myhostname.mem.wired 5680394240 1601909210" assert_eq!(*result.metrics_ip_list.get(&ip_address).unwrap(), 9); } } - diff --git a/src/rust/receiver-mock/src/router/mod.rs b/src/rust/receiver-mock/src/router/mod.rs index 96a0106c..3083f4c2 100644 --- a/src/rust/receiver-mock/src/router/mod.rs +++ b/src/rust/receiver-mock/src/router/mod.rs @@ -11,7 +11,7 @@ use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; use crate::logs; -use crate::metadata::{parse_sumo_fields_header_value, Metadata}; +use crate::metadata::{get_common_metadata_from_headers, parse_sumo_fields_header_value, Metadata}; use crate::metrics; use crate::metrics::Sample; use crate::options; @@ -435,14 +435,17 @@ pub async fn handler_receiver( let headers = req.headers(); let empty_header = http::HeaderValue::from_str("").unwrap(); - let content_type = headers.get("content-type").unwrap_or(&empty_header).to_str().unwrap(); + let content_type = headers + .get("content-type") + .unwrap_or(&empty_header) + .to_str() + .unwrap(); - // parse the value of the X-Sumo-Fields header + // parse the value of the X-Sumo-* headers, excluding X-Sumo-Fields, which is handled separately // TODO: use the metadata for metrics - let x_sumo_fields_value = headers.get("x-sumo-fields").unwrap_or(&empty_header).to_str().unwrap(); - let metadata = match parse_sumo_fields_header_value(x_sumo_fields_value) { + let metadata = match get_common_metadata_from_headers(headers) { Ok(metadata) => metadata, - Err(error) => return HttpResponse::BadRequest().body(format!("Invalid X-Sumo-Fields header value: {}", error)), + Err(error) => return HttpResponse::BadRequest().body(error.to_string()), }; let mut rng = rand::thread_rng(); @@ -474,6 +477,18 @@ pub async fn handler_receiver( // Logs & events "application/x-www-form-urlencoded" => { + // parse X-Sumo-Fields for metadata + let mut metadata = metadata; + match headers.get("x-sumo-fields") { + Some(header_value) => match header_value.to_str() { + Ok(header_value_str) => match parse_sumo_fields_header_value(header_value_str) { + Ok(fields_metadata) => metadata.extend(fields_metadata), + Err(_) => return HttpResponse::BadRequest().body("Unable to parse X-Sumo-Fields header value"), + }, + Err(_) => return HttpResponse::BadRequest().body("Unable to parse X-Sumo-Fields header value"), + }, + None => (), + }; app_state.add_log_lines(lines.clone(), metadata, remote_address, &opts); if opts.print.logs { for line in lines.clone() { @@ -718,7 +733,9 @@ mod tests_terraform { .await; { - let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request(); + let req = test::TestRequest::get() + .uri("/terraform/api/v1/fields") + .to_request(); let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), 200); @@ -757,7 +774,9 @@ mod tests_terraform { } { - let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request(); + let req = test::TestRequest::get() + .uri("/terraform/api/v1/fields") + .to_request(); let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), 200); @@ -793,7 +812,9 @@ mod tests_terraform { // ... and check it exists { - let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request(); + let req = test::TestRequest::get() + .uri("/terraform/api/v1/fields") + .to_request(); let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), 200); @@ -1014,7 +1035,9 @@ mod tests_metrics { { // Checking for existence of `namespace` label should also yield the // second time series only. - let req = test::TestRequest::get().uri("/metrics-samples?namespace").to_request(); + let req = test::TestRequest::get() + .uri("/metrics-samples?namespace") + .to_request(); let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), 200); @@ -1040,7 +1063,9 @@ mod tests_metrics { } { // and now let's check the previous time series with URL query params - let req = test::TestRequest::get().uri("/metrics-samples?mock=yes").to_request(); + let req = test::TestRequest::get() + .uri("/metrics-samples?mock=yes") + .to_request(); let resp = test::call_service(&mut app, req).await; assert_eq!(resp.status(), 200); @@ -1142,6 +1167,9 @@ mod tests_logs { .set_payload(log_payload) .header("Content-Type", "application/x-www-form-urlencoded") .header("X-Sumo-Fields", x_sumo_fields_value) + .header("X-Sumo-Host", "localhost") + .header("X-Sumo-Category", "category") + .header("X-Sumo-Name", "name") .to_request(); let resp = test::call_service(&mut app, req).await; @@ -1161,7 +1189,9 @@ mod tests_logs { // from_ts is inclusive { - let req = test::TestRequest::get().uri("/logs/count?from_ts=5").to_request(); + let req = test::TestRequest::get() + .uri("/logs/count?from_ts=5") + .to_request(); let resp = test::call_service(&mut app, req).await; let response_body: LogsCountResponse = test::read_body_json(resp).await; @@ -1191,9 +1221,23 @@ mod tests_logs { assert_eq!(response_body.count, 2); } + // X-Sumo-* fields + { + let req = test::TestRequest::get() + .uri("/logs/count?_sourceName=name&_sourceHost=localhost&_sourceCategory=category") + .to_request(); + let resp = test::call_service(&mut app, req).await; + + let response_body: LogsCountResponse = test::read_body_json(resp).await; + + assert_eq!(response_body.count, 3); + } + // wildcard query { - let req = test::TestRequest::get().uri("/logs/count?namespace=").to_request(); + let req = test::TestRequest::get() + .uri("/logs/count?namespace=") + .to_request(); let resp = test::call_service(&mut app, req).await; let response_body: LogsCountResponse = test::read_body_json(resp).await;