Skip to content

Commit 935ee79

Browse files
enhancement: implement generic flattening (#1017)
this helps to flatten the nested JSON such that all list type fields gets converted to primitive type hence splitting the repeating array to multiple rows e.g. `[ { "id": 1, "name": "John Doe", "addresses": [ { "street": "123 Main St", "city": "Springfield", "state": "IL", "zip": "62701" }, { "street": "456 Elm St", "city": "Springfield", "state": "IL", "zip": "62702" } ] } ] ` gets converted to below ` [ { "id": 1, "name": "John Doe", "addresses_street": "123 Main St", "addresses_city": "Springfield", "addresses_state": "IL", "addresses_zip": "62701", }, { "id": 1, "name": "John Doe", "addresses_street": "456 Elm St", "addresses_city": "Springfield", "addresses_state": "IL", "addresses_zip": "62702", } ] `
1 parent 199ebfd commit 935ee79

File tree

3 files changed

+77
-26
lines changed

3 files changed

+77
-26
lines changed

src/handlers/http/ingest.rs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,7 @@ mod tests {
293293
use std::{collections::HashMap, sync::Arc};
294294

295295
use actix_web::test::TestRequest;
296-
use arrow_array::{
297-
types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray,
298-
};
296+
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
299297
use arrow_schema::{DataType, Field};
300298
use serde_json::json;
301299

@@ -689,25 +687,14 @@ mod tests {
689687
])
690688
);
691689

692-
let c_a = vec![None, None, Some(vec![Some(1i64)]), Some(vec![Some(1)])];
693-
let c_b = vec![None, None, None, Some(vec![Some(2i64)])];
694-
695690
assert_eq!(
696-
rb.column_by_name("c_a")
697-
.unwrap()
698-
.as_any()
699-
.downcast_ref::<ListArray>()
700-
.unwrap(),
701-
&ListArray::from_iter_primitive::<Int64Type, _, _>(c_a)
691+
rb.column_by_name("c_a").unwrap().as_int64_arr(),
692+
&Int64Array::from(vec![None, None, Some(1), Some(1)])
702693
);
703694

704695
assert_eq!(
705-
rb.column_by_name("c_b")
706-
.unwrap()
707-
.as_any()
708-
.downcast_ref::<ListArray>()
709-
.unwrap(),
710-
&ListArray::from_iter_primitive::<Int64Type, _, _>(c_b)
696+
rb.column_by_name("c_b").unwrap().as_int64_arr(),
697+
&Int64Array::from(vec![None, None, None, Some(2)])
711698
);
712699
}
713700
}

src/utils/json/flatten.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,67 @@ pub fn flatten_array_objects(
304304
Ok(())
305305
}
306306

307+
pub fn flatten_json(value: &Value) -> Vec<Value> {
308+
match value {
309+
Value::Array(arr) => {
310+
let mut results = Vec::new();
311+
for item in arr {
312+
results.extend(flatten_json(item));
313+
}
314+
results
315+
}
316+
Value::Object(map) => {
317+
let mut results = vec![map.clone()];
318+
for (key, val) in map {
319+
if matches!(val, Value::Array(_)) {
320+
if let Value::Array(arr) = val {
321+
let mut new_results = Vec::new();
322+
for item in arr {
323+
let flattened_items = flatten_json(item);
324+
for flattened_item in flattened_items {
325+
for result in &results {
326+
let mut new_obj = result.clone();
327+
new_obj.insert(key.clone(), flattened_item.clone());
328+
new_results.push(new_obj);
329+
}
330+
}
331+
}
332+
results = new_results;
333+
}
334+
} else if matches!(val, Value::Object(_)) {
335+
let nested_results = flatten_json(val);
336+
let mut new_results = Vec::new();
337+
for nested_result in nested_results {
338+
for result in &results {
339+
let mut new_obj = result.clone();
340+
new_obj.insert(key.clone(), nested_result.clone());
341+
new_results.push(new_obj);
342+
}
343+
}
344+
results = new_results;
345+
}
346+
}
347+
results.into_iter().map(Value::Object).collect()
348+
}
349+
_ => vec![value.clone()],
350+
}
351+
}
352+
353+
pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, anyhow::Error> {
354+
let mut result = Vec::new();
355+
for item in flattened {
356+
let mut map = Map::new();
357+
if let Some(item) = item.as_object() {
358+
for (key, value) in item {
359+
map.insert(key.clone(), value.clone());
360+
}
361+
result.push(Value::Object(map));
362+
} else {
363+
return Err(anyhow!("Expected object in array of objects"));
364+
}
365+
}
366+
Ok(Value::Array(result))
367+
}
307368
#[cfg(test)]
308369
mod tests {
309370
use crate::utils::json::flatten::flatten_array_objects;

src/utils/json/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,17 @@ pub fn flatten_json_body(
2828
custom_partition: Option<String>,
2929
validation_required: bool,
3030
) -> Result<Value, anyhow::Error> {
31-
flatten::flatten(
32-
body,
33-
"_",
34-
time_partition,
35-
time_partition_limit,
36-
custom_partition,
37-
validation_required,
38-
)
31+
match flatten::convert_to_array(flatten::flatten_json(&body)) {
32+
Ok(nested_value) => flatten::flatten(
33+
nested_value,
34+
"_",
35+
time_partition,
36+
time_partition_limit,
37+
custom_partition,
38+
validation_required,
39+
),
40+
Err(err) => Err(err),
41+
}
3942
}
4043

4144
pub fn convert_array_to_object(

0 commit comments

Comments
 (0)