Skip to content

Commit a90ed13

Browse files
feat: add custom field p_format_verified (#1303)
if server successfully verifies the format specified in the `p_format` header, it adds `p_format_verified=true` to the event. Otherwise, if schema detection fails, add `p_format_verified=false` to the event
1 parent ab73e73 commit a90ed13

File tree

5 files changed

+93
-19
lines changed

5 files changed

+93
-19
lines changed

resources/formats.json

+49-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
"name": "alb_log",
7373
"regex": [
7474
{
75-
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>[^ \\(]+) (?:\\([^\\)]+\\))? (?<ssl_cipher>[\\w-]+) (?<ssl_protocol>[\\w\\.-]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
75+
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[\\w\\.:]+):(?<client_port>\\d+) (?<target_ip>[\\w\\.:]+):(?<target_port>\\d+) (?<request_processing_time>[-\\d\\.]+) (?<target_processing_time>[-\\d\\.]+) (?<response_processing_time>[-\\d\\.]+) (?<elb_status_code>\\d+|-) (?<target_status_code>\\d+|-) (?<received_bytes>\\d+) (?<sent_bytes>\\d+) (?<cs_method>POST|GET|PUT|DELETE|HEAD|OPTIONS|CONNECT|TRACE|PATCH) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>.+?) (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>\\d+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[\\d\\.:]+) (?<target_status_desc>\\d+|-) (?<classification>[^ ]+) (?<classification_reason>[^ ]+) (?<track_id>[^ ]+)$",
7676
"fields": [
7777
"type",
7878
"timestamp",
@@ -109,6 +109,54 @@
109109
"classification_reason",
110110
"track_id"
111111
]
112+
},
113+
{
114+
"pattern": "^(?<type>http|https|h2|ws|wss) (?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<elb>[^ ]+) (?<client_ip>[^:]+):(?<client_port>\\d+) (?<target_ip>[^ ]+) (?<request_processing_time>[^ ]+) (?<target_processing_time>[^ ]+) (?<response_processing_time>[^ ]+) (?<elb_status_code>[^ ]+) (?<target_status_code>[^ ]+) (?<received_bytes>[^ ]+) (?<sent_bytes>[^ ]+) (?<cs_method>[^ ]+) (?<cs_uri_whole>[^ ]+) (?<cs_version>[^ ]+) (?<user_agent>.*?) (?<ssl_cipher>[^ ]+) (?<ssl_protocol>[^ ]+) (?<target_group_arn>[^ ]+) (?<trace_id>[^ ]+) (?<domain_name>[^ ]+) (?<chosen_cert_arn>[^ ]+) (?<action_executed>[^ ]+) (?<request_creation_time>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z|[^ ]+) (?<redirect_url>[^ ]+) (?<redirect_proto>[^ ]+) (?<redirect_port>[^ ]+) (?<target_ip_port>[^ ]+|[^ ]*) (?<target_status_desc>[^ ]+|[^ ]*) (?<classification>[^ ]+|[^ ]*) (?<classification_reason>[^ ]+|[^ ]*) (?<track_id>TID_[a-f0-9]+)$",
115+
"fields": [
116+
"type",
117+
"timestamp",
118+
"elb",
119+
"client_ip",
120+
"client_port",
121+
"target_ip",
122+
"request_processing_time",
123+
"target_processing_time",
124+
"response_processing_time",
125+
"elb_status_code",
126+
"target_status_code",
127+
"received_bytes",
128+
"sent_bytes",
129+
"cs_method",
130+
"cs_uri_whole",
131+
"cs_version",
132+
"user_agent",
133+
"ssl_cipher",
134+
"ssl_protocol",
135+
"target_group_arn",
136+
"trace_id",
137+
"domain_name",
138+
"chosen_cert_arn",
139+
"action_executed",
140+
"request_creation_time",
141+
"redirect_url",
142+
"redirect_proto",
143+
"redirect_port",
144+
"target_ip_port",
145+
"target_status_desc",
146+
"classification",
147+
"classification_reason",
148+
"track_id"
149+
]
150+
},
151+
{
152+
"pattern": "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?<client_ip>[\\d\\.]+) (?<client_port>\\d+) (?<target_port>\\d+) (- ){7}(?<track_id>TID_[a-f0-9]+)$",
153+
"fields": [
154+
"timestamp",
155+
"client_ip",
156+
"client_port",
157+
"target_port",
158+
"track_id"
159+
]
112160
}
113161
]
114162
},

src/event/format/known_schema.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use serde::{Deserialize, Deserializer};
2424
use serde_json::{Map, Value};
2525
use tracing::error;
2626

27+
use crate::event::FORMAT_VERIFY_KEY;
28+
2729
/// Predefined JSON with known textual logging formats
2830
const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
2931

@@ -120,11 +122,21 @@ impl SchemaDefinition {
120122
}
121123
}
122124

125+
// add `P_FORMAT_VERIFY_KEY` to the object
126+
obj.insert(
127+
FORMAT_VERIFY_KEY.to_string(),
128+
Value::String("true".to_string()),
129+
);
130+
123131
obj.extend(extracted_fields);
124132

125133
return Some(format.fields.clone());
126134
}
127-
135+
// add `P_FORMAT_VERIFY_KEY` to the object
136+
obj.insert(
137+
FORMAT_VERIFY_KEY.to_string(),
138+
Value::String("false".to_string()),
139+
);
128140
None
129141
}
130142
}
@@ -180,6 +192,7 @@ impl EventProcessor {
180192
pub fn extract_from_inline_log(
181193
&self,
182194
json: &mut Value,
195+
p_custom_fields: &mut HashMap<String, String>,
183196
log_source: &str,
184197
extract_log: Option<&str>,
185198
) -> Result<HashSet<String>, Error> {
@@ -197,15 +210,17 @@ impl EventProcessor {
197210
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
198211
fields.extend(known_fields);
199212
} else {
200-
return Err(Error::Unacceptable(log_source.to_owned()));
213+
// add `P_FORMAT_VERIFY_KEY` to the object
214+
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
201215
}
202216
}
203217
}
204218
Value::Object(event) => {
205219
if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
206220
return Ok(known_fields);
207221
} else {
208-
return Err(Error::Unacceptable(log_source.to_owned()));
222+
// add `P_FORMAT_VERIFY_KEY` to the object
223+
p_custom_fields.insert(FORMAT_VERIFY_KEY.to_string(), "false".to_string());
209224
}
210225
}
211226
_ => unreachable!("We don't accept events of the form: {json}"),

src/event/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3838
pub const USER_AGENT_KEY: &str = "p_user_agent";
3939
pub const SOURCE_IP_KEY: &str = "p_src_ip";
4040
pub const FORMAT_KEY: &str = "p_format";
41+
pub const FORMAT_VERIFY_KEY: &str = "p_format_verified";
4142

4243
#[derive(Clone)]
4344
pub struct Event {

src/handlers/http/ingest.rs

+20-10
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,15 @@ pub async fn ingest(
8181
return Err(PostError::OtelNotSupported);
8282
}
8383

84+
let mut p_custom_fields = get_custom_fields_from_header(&req);
85+
8486
let fields = match &log_source {
85-
LogSource::Custom(src) => {
86-
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?
87-
}
87+
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
88+
&mut json,
89+
&mut p_custom_fields,
90+
src,
91+
extract_log,
92+
)?,
8893
_ => HashSet::new(),
8994
};
9095

@@ -114,7 +119,7 @@ pub async fn ingest(
114119
PARSEABLE
115120
.add_update_log_source(&stream_name, log_source_entry)
116121
.await?;
117-
let p_custom_fields = get_custom_fields_from_header(req);
122+
118123
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
119124

120125
Ok(HttpResponse::Ok().finish())
@@ -198,7 +203,7 @@ pub async fn handle_otel_logs_ingestion(
198203
.add_update_log_source(&stream_name, log_source_entry)
199204
.await?;
200205

201-
let p_custom_fields = get_custom_fields_from_header(req);
206+
let p_custom_fields = get_custom_fields_from_header(&req);
202207

203208
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
204209

@@ -256,7 +261,7 @@ pub async fn handle_otel_metrics_ingestion(
256261
.add_update_log_source(&stream_name, log_source_entry)
257262
.await?;
258263

259-
let p_custom_fields = get_custom_fields_from_header(req);
264+
let p_custom_fields = get_custom_fields_from_header(&req);
260265

261266
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
262267

@@ -315,7 +320,7 @@ pub async fn handle_otel_traces_ingestion(
315320
.add_update_log_source(&stream_name, log_source_entry)
316321
.await?;
317322

318-
let p_custom_fields = get_custom_fields_from_header(req);
323+
let p_custom_fields = get_custom_fields_from_header(&req);
319324

320325
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
321326

@@ -363,13 +368,18 @@ pub async fn post_event(
363368
.headers()
364369
.get(EXTRACT_LOG_KEY)
365370
.and_then(|h| h.to_str().ok());
366-
371+
let mut p_custom_fields = get_custom_fields_from_header(&req);
367372
match &log_source {
368373
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
369374
return Err(PostError::OtelNotSupported)
370375
}
371376
LogSource::Custom(src) => {
372-
KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
377+
KNOWN_SCHEMA_LIST.extract_from_inline_log(
378+
&mut json,
379+
&mut p_custom_fields,
380+
src,
381+
extract_log,
382+
)?;
373383
}
374384
_ => {}
375385
}
@@ -386,7 +396,7 @@ pub async fn post_event(
386396
})
387397
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
388398
}
389-
let p_custom_fields = get_custom_fields_from_header(req);
399+
390400
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
391401

392402
Ok(HttpResponse::Ok().finish())

src/handlers/http/modal/utils/ingest_utils.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ async fn push_logs(
145145
Ok(())
146146
}
147147

148-
pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
148+
pub fn get_custom_fields_from_header(req: &HttpRequest) -> HashMap<String, String> {
149149
let user_agent = req
150150
.headers()
151151
.get(USER_AGENT)
@@ -217,7 +217,7 @@ mod tests {
217217
.insert_header(("x-p-environment", "dev"))
218218
.to_http_request();
219219

220-
let custom_fields = get_custom_fields_from_header(req);
220+
let custom_fields = get_custom_fields_from_header(&req);
221221

222222
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
223223
assert_eq!(custom_fields.get("environment").unwrap(), "dev");
@@ -230,7 +230,7 @@ mod tests {
230230
.insert_header((STREAM_NAME_HEADER_KEY, "teststream"))
231231
.to_http_request();
232232

233-
let custom_fields = get_custom_fields_from_header(req);
233+
let custom_fields = get_custom_fields_from_header(&req);
234234

235235
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
236236
assert!(!custom_fields.contains_key(STREAM_NAME_HEADER_KEY));
@@ -243,7 +243,7 @@ mod tests {
243243
.insert_header((LOG_SOURCE_KEY, "otel-logs"))
244244
.to_http_request();
245245

246-
let custom_fields = get_custom_fields_from_header(req);
246+
let custom_fields = get_custom_fields_from_header(&req);
247247

248248
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
249249
assert_eq!(custom_fields.get(FORMAT_KEY).unwrap(), "otel-logs");
@@ -255,7 +255,7 @@ mod tests {
255255
.insert_header(("x-p-", "empty"))
256256
.to_http_request();
257257

258-
let custom_fields = get_custom_fields_from_header(req);
258+
let custom_fields = get_custom_fields_from_header(&req);
259259

260260
assert_eq!(custom_fields.len(), 2);
261261
assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "");

0 commit comments

Comments
 (0)