Skip to content

Commit fab4ee3

Browse files
fix: add custom fields for pmeta stream (#1280)
add p_custom_field - p_user_agent = parseable p_format = pmeta changed user_agent_key to `parseable` to keep it consistent with the user_agent_key used in kafka (all lowercase) Signed-off-by: Nikhil Sinha <[email protected]> --------- Signed-off-by: Nikhil Sinha <[email protected]>
1 parent 7adcf12 commit fab4ee3

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

src/handlers/http/ingest.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ use chrono::Utc;
2626
use http::StatusCode;
2727
use serde_json::Value;
2828

29-
use crate::event;
3029
use crate::event::error::EventError;
3130
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3231
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
32+
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
3333
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3434
use crate::metadata::SchemaVersion;
3535
use crate::option::Mode;
@@ -124,7 +124,9 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
124124
let size: usize = body.len();
125125
let json: Value = serde_json::from_slice(&body)?;
126126
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
127-
127+
let mut p_custom_fields = HashMap::new();
128+
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
129+
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
128130
// For internal streams, use old schema
129131
format::json::Event::new(json)
130132
.into_event(
@@ -136,7 +138,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
136138
None,
137139
SchemaVersion::V0,
138140
StreamType::Internal,
139-
&HashMap::new(),
141+
&p_custom_fields,
140142
)?
141143
.process()?;
142144

0 commit comments

Comments
 (0)