Skip to content

Commit 450bac2

Browse files
nikhilsinhaparseableDevdutt Shenoi
and
Devdutt Shenoi
authored
fix: hierarchical json flattening restriction (#1058)
hierarchical json flattening restriction get the level of hierarchy from the json perform generic flattening only if level of nesting is <=4 --------- Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent d9ae6b8 commit 450bac2

File tree

3 files changed

+303
-95
lines changed

3 files changed

+303
-95
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 157 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ use anyhow::anyhow;
2323
use arrow_schema::Field;
2424
use bytes::Bytes;
2525
use chrono::{DateTime, NaiveDateTime, Utc};
26-
use itertools::Itertools;
2726
use serde_json::Value;
2827

2928
use crate::{
3029
event::{
30+
self,
3131
format::{self, EventFormat},
32-
Event,
3332
},
3433
handlers::{
3534
http::{ingest::PostError, kinesis},
@@ -73,61 +72,174 @@ pub async fn push_logs(
7372
let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?;
7473
let schema_version = STREAM_INFO.get_schema_version(stream_name)?;
7574
let body_val: Value = serde_json::from_slice(body)?;
76-
let data = convert_array_to_object(
77-
body_val,
78-
time_partition.as_ref(),
79-
time_partition_limit,
80-
custom_partition.as_ref(),
81-
schema_version,
82-
)?;
8375

84-
for value in data {
85-
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
86-
let parsed_timestamp = match time_partition.as_ref() {
87-
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
88-
_ => Utc::now().naive_utc(),
89-
};
90-
let custom_partition_values = match custom_partition.as_ref() {
91-
Some(custom_partition) => {
92-
let custom_partitions = custom_partition.split(',').collect_vec();
93-
get_custom_partition_values(&value, &custom_partitions)
76+
let size: usize = body.len();
77+
let mut parsed_timestamp = Utc::now().naive_utc();
78+
if time_partition.is_none() {
79+
if custom_partition.is_none() {
80+
let size = size as u64;
81+
create_process_record_batch(
82+
stream_name,
83+
req,
84+
body_val,
85+
static_schema_flag.as_ref(),
86+
None,
87+
parsed_timestamp,
88+
&HashMap::new(),
89+
size,
90+
schema_version,
91+
)
92+
.await?;
93+
} else {
94+
let data = convert_array_to_object(
95+
body_val,
96+
None,
97+
None,
98+
custom_partition.as_ref(),
99+
schema_version,
100+
)?;
101+
let custom_partition = custom_partition.unwrap();
102+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
103+
104+
for value in data {
105+
let custom_partition_values =
106+
get_custom_partition_values(&value, &custom_partition_list);
107+
108+
let size = value.to_string().into_bytes().len() as u64;
109+
create_process_record_batch(
110+
stream_name,
111+
req,
112+
value,
113+
static_schema_flag.as_ref(),
114+
None,
115+
parsed_timestamp,
116+
&custom_partition_values,
117+
size,
118+
schema_version,
119+
)
120+
.await?;
94121
}
95-
None => HashMap::new(),
96-
};
97-
let schema = STREAM_INFO
98-
.read()
99-
.unwrap()
100-
.get(stream_name)
101-
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
102-
.schema
103-
.clone();
104-
let (rb, is_first_event) = into_event_batch(
105-
req,
106-
&value,
107-
schema,
108-
static_schema_flag.as_ref(),
122+
}
123+
} else if custom_partition.is_none() {
124+
let data = convert_array_to_object(
125+
body_val,
126+
time_partition.as_ref(),
127+
time_partition_limit,
128+
None,
129+
schema_version,
130+
)?;
131+
for value in data {
132+
parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?;
133+
let size = value.to_string().into_bytes().len() as u64;
134+
create_process_record_batch(
135+
stream_name,
136+
req,
137+
value,
138+
static_schema_flag.as_ref(),
139+
time_partition.as_ref(),
140+
parsed_timestamp,
141+
&HashMap::new(),
142+
size,
143+
schema_version,
144+
)
145+
.await?;
146+
}
147+
} else {
148+
let data = convert_array_to_object(
149+
body_val,
109150
time_partition.as_ref(),
151+
time_partition_limit,
152+
custom_partition.as_ref(),
110153
schema_version,
111154
)?;
155+
let custom_partition = custom_partition.unwrap();
156+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
112157

113-
Event {
114-
rb,
115-
stream_name: stream_name.to_owned(),
116-
origin_format: "json",
117-
origin_size,
118-
is_first_event,
119-
parsed_timestamp,
120-
time_partition: time_partition.clone(),
121-
custom_partition_values,
122-
stream_type: StreamType::UserDefined,
158+
for value in data {
159+
let custom_partition_values =
160+
get_custom_partition_values(&value, &custom_partition_list);
161+
162+
parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?;
163+
let size = value.to_string().into_bytes().len() as u64;
164+
create_process_record_batch(
165+
stream_name,
166+
req,
167+
value,
168+
static_schema_flag.as_ref(),
169+
time_partition.as_ref(),
170+
parsed_timestamp,
171+
&custom_partition_values,
172+
size,
173+
schema_version,
174+
)
175+
.await?;
123176
}
124-
.process()
125-
.await?;
126177
}
127178

128179
Ok(())
129180
}
130181

182+
#[allow(clippy::too_many_arguments)]
183+
pub async fn create_process_record_batch(
184+
stream_name: &str,
185+
req: &HttpRequest,
186+
value: Value,
187+
static_schema_flag: Option<&String>,
188+
time_partition: Option<&String>,
189+
parsed_timestamp: NaiveDateTime,
190+
custom_partition_values: &HashMap<String, String>,
191+
origin_size: u64,
192+
schema_version: SchemaVersion,
193+
) -> Result<(), PostError> {
194+
let (rb, is_first_event) = get_stream_schema(
195+
stream_name,
196+
req,
197+
&value,
198+
static_schema_flag,
199+
time_partition,
200+
schema_version,
201+
)?;
202+
event::Event {
203+
rb,
204+
stream_name: stream_name.to_owned(),
205+
origin_format: "json",
206+
origin_size,
207+
is_first_event,
208+
parsed_timestamp,
209+
time_partition: time_partition.cloned(),
210+
custom_partition_values: custom_partition_values.clone(),
211+
stream_type: StreamType::UserDefined,
212+
}
213+
.process()
214+
.await?;
215+
216+
Ok(())
217+
}
218+
219+
pub fn get_stream_schema(
220+
stream_name: &str,
221+
req: &HttpRequest,
222+
body: &Value,
223+
static_schema_flag: Option<&String>,
224+
time_partition: Option<&String>,
225+
schema_version: SchemaVersion,
226+
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
227+
let hash_map = STREAM_INFO.read().unwrap();
228+
let schema = hash_map
229+
.get(stream_name)
230+
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
231+
.schema
232+
.clone();
233+
into_event_batch(
234+
req,
235+
body,
236+
schema,
237+
static_schema_flag,
238+
time_partition,
239+
schema_version,
240+
)
241+
}
242+
131243
pub fn into_event_batch(
132244
req: &HttpRequest,
133245
body: &Value,

0 commit comments

Comments
 (0)