Skip to content

Commit

Permalink
feat(receiver-mock): get metadata from X-Sumo-* headers
Browse files Browse the repository at this point in the history
We only got metadata from X-Sumo-Fields, this commit adds X-Sumo-Name,
X-Sumo-Host and X-Sumo-Category. This covers all the headers supported
by Sumo's HTTP sources.
  • Loading branch information
Mikołaj Świątek committed Dec 21, 2021
1 parent a9eebde commit 83b7755
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 17 deletions.
5 changes: 4 additions & 1 deletion src/rust/receiver-mock/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ mod tests {
])),
];
let body = "{\"log\": \"Log message\", \"timestamp\": 1}";
let raw_logs = metadata.iter().map(|mt| (body.to_string(), mt.to_owned())).collect();
let raw_logs = metadata
.iter()
.map(|mt| (body.to_string(), mt.to_owned()))
.collect();
let repository = LogRepository::from_raw_logs(raw_logs).unwrap();

assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions src/rust/receiver-mock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ async fn run_app(hostname: String, port: u16, opts: Options) -> std::io::Result<
.route(
"/collector/heartbeat",
web::post().to(router::api::v1::handler_collector_heartbeat),
)
)
),
)
.service(
web::scope("/terraform")
.app_data(app_metadata.clone())
Expand Down
79 changes: 79 additions & 0 deletions src/rust/receiver-mock/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,45 @@
use actix_web::http::header::HeaderMap;
use anyhow::anyhow;
use std::collections::HashMap;

pub type Metadata = HashMap<String, String>;

// Get the metadata from Sumo's common http headers
// This does not include X-Sumo-Fields, which only applies to logs and is handled separately
pub fn get_common_metadata_from_headers(headers: &HeaderMap) -> Result<Metadata, anyhow::Error> {
let mut metadata = Metadata::new();

// these metadata field names follow what Sumo itself does
// see: https://help.sumologic.com/05Search/Get-Started-with-Search/Search-Basics/Built-in-Metadata#built-in-metadata-fields
// and: https://help.sumologic.com/03Send-Data/Sources/02Sources-for-Hosted-Collectors/HTTP-Source/Upload-Data-to-an-HTTP-Source#supported-http-headers

match headers.get("x-sumo-name") {
Some(header_value) => match header_value.to_str() {
Ok(header_value_str) => metadata.insert("_sourceName".to_string(), header_value_str.to_string()),
Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Name header value")),
},
None => None,
};

match headers.get("x-sumo-host") {
Some(header_value) => match header_value.to_str() {
Ok(header_value_str) => metadata.insert("_sourceHost".to_string(), header_value_str.to_string()),
Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Host header value")),
},
None => None,
};

match headers.get("x-sumo-category") {
Some(header_value) => match header_value.to_str() {
Ok(header_value_str) => metadata.insert("_sourceCategory".to_string(), header_value_str.to_string()),
Err(_) => return Err(anyhow!("Couldn't parse X-Sumo-Category header value")),
},
None => None,
};

return Ok(metadata);
}

/// Parse the value of the X-Sumo-Fields header into a map of field name to field value
pub fn parse_sumo_fields_header_value(header_value: &str) -> Result<Metadata, anyhow::Error> {
let mut field_values = HashMap::new();
Expand All @@ -21,6 +58,7 @@ pub fn parse_sumo_fields_header_value(header_value: &str) -> Result<Metadata, an
#[cfg(test)]
mod tests {
use super::*;
use actix_web::http::{HeaderName, HeaderValue};

#[test]
fn test_parse_sumo_fields_valid() {
Expand Down Expand Up @@ -57,4 +95,45 @@ mod tests {
assert!(parse_sumo_fields_header_value(input).is_err())
}
}

#[test]
fn test_get_common_metadata_from_headers_valid() {
let mut headers = HeaderMap::new();
headers.insert(
HeaderName::from_static("x-sumo-name"),
HeaderValue::from_static("name"),
);
headers.insert(
HeaderName::from_static("x-sumo-host"),
HeaderValue::from_static("host"),
);
headers.insert(
HeaderName::from_static("x-sumo-category"),
HeaderValue::from_static("category"),
);
let metadata = get_common_metadata_from_headers(&headers).unwrap();

assert_eq!(
metadata,
HashMap::from([
(String::from("_sourceName"), String::from("name")),
(String::from("_sourceHost"), String::from("host")),
(String::from("_sourceCategory"), String::from("category"))
])
)
}
#[test]
fn test_get_common_metadata_from_headers_invalid() {
let mut headers = HeaderMap::new();
let invalid_bytes: [u8; 3] = [255, 255, 255];
headers.insert(
HeaderName::from_static("x-sumo-name"),
HeaderValue::from_bytes(&invalid_bytes).unwrap(),
);
let result = get_common_metadata_from_headers(&headers);
match result {
Ok(_) => panic!("Expected error, got valid result"),
Err(error) => assert_eq!(error.to_string(), "Couldn't parse X-Sumo-Name header value"),
}
}
}
1 change: 0 additions & 1 deletion src/rust/receiver-mock/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,3 @@ myhostname.mem.wired 5680394240 1601909210"
assert_eq!(*result.metrics_ip_list.get(&ip_address).unwrap(), 9);
}
}

70 changes: 57 additions & 13 deletions src/rust/receiver-mock/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize};

use crate::logs;
use crate::metadata::{parse_sumo_fields_header_value, Metadata};
use crate::metadata::{get_common_metadata_from_headers, parse_sumo_fields_header_value, Metadata};
use crate::metrics;
use crate::metrics::Sample;
use crate::options;
Expand Down Expand Up @@ -435,14 +435,17 @@ pub async fn handler_receiver(

let headers = req.headers();
let empty_header = http::HeaderValue::from_str("").unwrap();
let content_type = headers.get("content-type").unwrap_or(&empty_header).to_str().unwrap();
let content_type = headers
.get("content-type")
.unwrap_or(&empty_header)
.to_str()
.unwrap();

// parse the value of the X-Sumo-Fields header
// parse the value of the X-Sumo-* headers, excluding X-Sumo-Fields, which is handled separately
// TODO: use the metadata for metrics
let x_sumo_fields_value = headers.get("x-sumo-fields").unwrap_or(&empty_header).to_str().unwrap();
let metadata = match parse_sumo_fields_header_value(x_sumo_fields_value) {
let metadata = match get_common_metadata_from_headers(headers) {
Ok(metadata) => metadata,
Err(error) => return HttpResponse::BadRequest().body(format!("Invalid X-Sumo-Fields header value: {}", error)),
Err(error) => return HttpResponse::BadRequest().body(error.to_string()),
};

let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -474,6 +477,18 @@ pub async fn handler_receiver(

// Logs & events
"application/x-www-form-urlencoded" => {
// parse X-Sumo-Fields for metadata
let mut metadata = metadata;
match headers.get("x-sumo-fields") {
Some(header_value) => match header_value.to_str() {
Ok(header_value_str) => match parse_sumo_fields_header_value(header_value_str) {
Ok(fields_metadata) => metadata.extend(fields_metadata),
Err(_) => return HttpResponse::BadRequest().body("Unable to parse X-Sumo-Fields header value"),
},
Err(_) => return HttpResponse::BadRequest().body("Unable to parse X-Sumo-Fields header value"),
},
None => (),
};
app_state.add_log_lines(lines.clone(), metadata, remote_address, &opts);
if opts.print.logs {
for line in lines.clone() {
Expand Down Expand Up @@ -718,7 +733,9 @@ mod tests_terraform {
.await;

{
let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request();
let req = test::TestRequest::get()
.uri("/terraform/api/v1/fields")
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), 200);

Expand Down Expand Up @@ -757,7 +774,9 @@ mod tests_terraform {
}

{
let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request();
let req = test::TestRequest::get()
.uri("/terraform/api/v1/fields")
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), 200);

Expand Down Expand Up @@ -793,7 +812,9 @@ mod tests_terraform {

// ... and check it exists
{
let req = test::TestRequest::get().uri("/terraform/api/v1/fields").to_request();
let req = test::TestRequest::get()
.uri("/terraform/api/v1/fields")
.to_request();
let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), 200);

Expand Down Expand Up @@ -1014,7 +1035,9 @@ mod tests_metrics {
{
// Checking for existence of `namespace` label should also yield the
// second time series only.
let req = test::TestRequest::get().uri("/metrics-samples?namespace").to_request();
let req = test::TestRequest::get()
.uri("/metrics-samples?namespace")
.to_request();

let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), 200);
Expand All @@ -1040,7 +1063,9 @@ mod tests_metrics {
}
{
// and now let's check the previous time series with URL query params
let req = test::TestRequest::get().uri("/metrics-samples?mock=yes").to_request();
let req = test::TestRequest::get()
.uri("/metrics-samples?mock=yes")
.to_request();

let resp = test::call_service(&mut app, req).await;
assert_eq!(resp.status(), 200);
Expand Down Expand Up @@ -1142,6 +1167,9 @@ mod tests_logs {
.set_payload(log_payload)
.header("Content-Type", "application/x-www-form-urlencoded")
.header("X-Sumo-Fields", x_sumo_fields_value)
.header("X-Sumo-Host", "localhost")
.header("X-Sumo-Category", "category")
.header("X-Sumo-Name", "name")
.to_request();

let resp = test::call_service(&mut app, req).await;
Expand All @@ -1161,7 +1189,9 @@ mod tests_logs {

// from_ts is inclusive
{
let req = test::TestRequest::get().uri("/logs/count?from_ts=5").to_request();
let req = test::TestRequest::get()
.uri("/logs/count?from_ts=5")
.to_request();
let resp = test::call_service(&mut app, req).await;

let response_body: LogsCountResponse = test::read_body_json(resp).await;
Expand Down Expand Up @@ -1191,9 +1221,23 @@ mod tests_logs {
assert_eq!(response_body.count, 2);
}

// X-Sumo-* fields
{
let req = test::TestRequest::get()
.uri("/logs/count?_sourceName=name&_sourceHost=localhost&_sourceCategory=category")
.to_request();
let resp = test::call_service(&mut app, req).await;

let response_body: LogsCountResponse = test::read_body_json(resp).await;

assert_eq!(response_body.count, 3);
}

// wildcard query
{
let req = test::TestRequest::get().uri("/logs/count?namespace=").to_request();
let req = test::TestRequest::get()
.uri("/logs/count?namespace=")
.to_request();
let resp = test::call_service(&mut app, req).await;

let response_body: LogsCountResponse = test::read_body_json(resp).await;
Expand Down

0 comments on commit 83b7755

Please sign in to comment.