Skip to content

Commit 8ebd117

Browse files
chore: add env for max level of flattening allowed for events (#1320)
This PR adds env `P_MAX_FLATTEN_LEVEL` to control the maximum level of flattening allowed. It defaults to 10. This is to ensure nested list type fields do not get created. eg. with previous implementation of hard coded level of 4, field gets created with data type - ``` { "name": "Records_resources_accountId", "data_type": { "List": { "name": "item", "data_type": { "List": { "name": "item", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ``` after this change, data type changes to - ``` { "name": "Records_resources_accountId", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ```
1 parent 2ff1421 commit 8ebd117

File tree

4 files changed

+47
-137
lines changed

4 files changed

+47
-137
lines changed

src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ pub struct Options {
378378
help = "total number of fields recommended in a dataset"
379379
)]
380380
pub dataset_fields_allowed_limit: usize,
381+
382+
// maximum level of flattening allowed for events
383+
// this is to prevent nested list type fields from getting created
384+
#[arg(
385+
long,
386+
env = "P_MAX_FLATTEN_LEVEL",
387+
default_value = "10",
388+
help = "Maximum level of flattening allowed for events"
389+
)]
390+
pub event_flatten_level: usize,
381391
}
382392

383393
#[derive(Parser, Debug)]

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub async fn flatten_and_push_logs(
6262
//custom flattening required for Amazon Kinesis
6363
let message: Message = serde_json::from_value(json)?;
6464
for record in flatten_kinesis_logs(message) {
65-
push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?;
65+
push_logs(stream_name, record, log_source, p_custom_fields).await?;
6666
}
6767
}
6868
LogSource::OtelLogs => {

src/utils/json/flatten.rs

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use serde_json::value::Value;
2525

2626
use thiserror::Error;
2727

28+
use crate::parseable::PARSEABLE;
29+
2830
#[derive(Error, Debug)]
2931
pub enum JsonFlattenError {
3032
#[error("Cannot flatten this JSON")]
@@ -274,19 +276,31 @@ pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError>
274276
let results = map
275277
.iter()
276278
.fold(vec![Map::new()], |results, (key, val)| match val {
277-
Value::Array(arr) => arr
278-
.iter()
279-
.flat_map(|flatten_item| {
280-
generic_flattening(flatten_item).unwrap_or_default()
281-
})
282-
.flat_map(|flattened_item| {
283-
results.iter().map(move |result| {
284-
let mut new_obj = result.clone();
285-
new_obj.insert(key.clone(), flattened_item.clone());
286-
new_obj
287-
})
288-
})
289-
.collect(),
279+
Value::Array(arr) => {
280+
if arr.is_empty() {
281+
// Insert empty array for this key in all current results
282+
results
283+
.into_iter()
284+
.map(|mut result| {
285+
result.insert(key.clone(), Value::Array(vec![]));
286+
result
287+
})
288+
.collect()
289+
} else {
290+
arr.iter()
291+
.flat_map(|flatten_item| {
292+
generic_flattening(flatten_item).unwrap_or_default()
293+
})
294+
.flat_map(|flattened_item| {
295+
results.iter().map(move |result| {
296+
let mut new_obj = result.clone();
297+
new_obj.insert(key.clone(), flattened_item.clone());
298+
new_obj
299+
})
300+
})
301+
.collect()
302+
}
303+
}
290304
Value::Object(_) => generic_flattening(val)
291305
.unwrap_or_default()
292306
.iter()
@@ -314,21 +328,21 @@ pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError>
314328
}
315329

316330
/// recursively checks the level of nesting for the serde Value
317-
/// if Value has more than 4 levels of hierarchy, returns true
318-
/// example -
331+
/// if Value has more than configured `P_MAX_FLATTEN_LEVEL` levels of hierarchy, returns true
332+
/// example - if `P_MAX_FLATTEN_LEVEL` is 4, and the JSON is
319333
/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true
320334
/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false
321-
pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
322-
if current_level > 4 {
335+
pub fn has_more_than_max_allowed_levels(value: &Value, current_level: usize) -> bool {
336+
if current_level > PARSEABLE.options.event_flatten_level {
323337
return true;
324338
}
325339
match value {
326340
Value::Array(arr) => arr
327341
.iter()
328-
.any(|item| has_more_than_four_levels(item, current_level)),
342+
.any(|item| has_more_than_max_allowed_levels(item, current_level)),
329343
Value::Object(map) => map
330344
.values()
331-
.any(|val| has_more_than_four_levels(val, current_level + 1)),
345+
.any(|val| has_more_than_max_allowed_levels(val, current_level + 1)),
332346
_ => false,
333347
}
334348
}
@@ -344,9 +358,7 @@ pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError
344358

345359
#[cfg(test)]
346360
mod tests {
347-
use crate::utils::json::flatten::{
348-
flatten_array_objects, generic_flattening, has_more_than_four_levels,
349-
};
361+
use crate::utils::json::flatten::{flatten_array_objects, generic_flattening};
350362

351363
use super::{flatten, JsonFlattenError};
352364
use serde_json::{json, Map, Value};
@@ -605,18 +617,6 @@ mod tests {
605617
);
606618
}
607619

608-
#[test]
609-
fn unacceptable_levels_of_nested_json() {
610-
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
611-
assert!(has_more_than_four_levels(&value, 1));
612-
}
613-
614-
#[test]
615-
fn acceptable_levels_of_nested_json() {
616-
let value = json!({"a":{"b":{"e":["a","b"]}}});
617-
assert!(!has_more_than_four_levels(&value, 1));
618-
}
619-
620620
#[test]
621621
fn flatten_json() {
622622
let value = json!({"a":{"b":{"e":["a","b"]}}});

src/utils/json/mod.rs

Lines changed: 2 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use std::fmt;
2020
use std::num::NonZeroU32;
2121

22-
use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels};
22+
use flatten::{convert_to_array, generic_flattening, has_more_than_max_allowed_levels};
2323
use serde::de::Visitor;
2424
use serde_json;
2525
use serde_json::Value;
@@ -43,7 +43,7 @@ pub fn flatten_json_body(
4343
) -> Result<Value, anyhow::Error> {
4444
// Flatten the json body only if new schema and has less than 4 levels of nesting
4545
let mut nested_value = if schema_version == SchemaVersion::V1
46-
&& !has_more_than_four_levels(&body, 1)
46+
&& !has_more_than_max_allowed_levels(&body, 1)
4747
&& matches!(
4848
log_source,
4949
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
@@ -144,50 +144,9 @@ where
144144

145145
#[cfg(test)]
146146
mod tests {
147-
use crate::event::format::LogSource;
148-
149147
use super::*;
150148
use serde::{Deserialize, Serialize};
151149
use serde_json::json;
152-
153-
#[test]
154-
fn hierarchical_json_flattening_success() {
155-
let value = json!({"a":{"b":{"e":["a","b"]}}});
156-
let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]);
157-
assert_eq!(
158-
flatten_json_body(
159-
value,
160-
None,
161-
None,
162-
None,
163-
crate::metadata::SchemaVersion::V1,
164-
false,
165-
&LogSource::default()
166-
)
167-
.unwrap(),
168-
expected
169-
);
170-
}
171-
172-
#[test]
173-
fn hierarchical_json_flattening_failure() {
174-
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
175-
let expected = json!({"a_b_c_d_e": ["a","b"]});
176-
assert_eq!(
177-
flatten_json_body(
178-
value,
179-
None,
180-
None,
181-
None,
182-
crate::metadata::SchemaVersion::V1,
183-
false,
184-
&LogSource::default()
185-
)
186-
.unwrap(),
187-
expected
188-
);
189-
}
190-
191150
#[derive(Serialize, Deserialize)]
192151
struct TestBool {
193152
#[serde(
@@ -353,63 +312,4 @@ mod tests {
353312
flattened_json
354313
);
355314
}
356-
357-
#[test]
358-
fn arr_obj_with_nested_type_v1() {
359-
let json = json!([
360-
{
361-
"a": 1,
362-
"b": "hello",
363-
},
364-
{
365-
"a": 1,
366-
"b": "hello",
367-
},
368-
{
369-
"a": 1,
370-
"b": "hello",
371-
"c": [{"a": 1}]
372-
},
373-
{
374-
"a": 1,
375-
"b": "hello",
376-
"c": [{"a": 1, "b": 2}]
377-
},
378-
]);
379-
let flattened_json = flatten_json_body(
380-
json,
381-
None,
382-
None,
383-
None,
384-
SchemaVersion::V1,
385-
false,
386-
&crate::event::format::LogSource::default(),
387-
)
388-
.unwrap();
389-
390-
assert_eq!(
391-
json!([
392-
{
393-
"a": 1,
394-
"b": "hello",
395-
},
396-
{
397-
"a": 1,
398-
"b": "hello",
399-
},
400-
{
401-
"a": 1,
402-
"b": "hello",
403-
"c_a": 1,
404-
},
405-
{
406-
"a": 1,
407-
"b": "hello",
408-
"c_a": 1,
409-
"c_b": 2,
410-
},
411-
]),
412-
flattened_json
413-
);
414-
}
415315
}

0 commit comments

Comments
 (0)