Skip to content

Commit d0f76ed

Browse files
Devdutt Shenoinikhilsinhaparseable
Devdutt Shenoi
andauthored
feat: conditionalize generic flattening and type casting for streams v1+ (#1057)
Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 794c8f1 commit d0f76ed

File tree

12 files changed

+550
-441
lines changed

12 files changed

+550
-441
lines changed

src/event/format/json.rs

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

3232
use super::{EventFormat, Metadata, Tags};
33-
use crate::utils::{arrow::get_field, json::flatten_json_body};
33+
use crate::{
34+
metadata::SchemaVersion,
35+
utils::{arrow::get_field, json::flatten_json_body},
36+
};
3437

3538
pub struct Event {
3639
pub data: Value,
@@ -48,8 +51,9 @@ impl EventFormat for Event {
4851
schema: &HashMap<String, Arc<Field>>,
4952
static_schema_flag: Option<&String>,
5053
time_partition: Option<&String>,
54+
schema_version: SchemaVersion,
5155
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
52-
let data = flatten_json_body(&self.data, None, None, None, false)?;
56+
let data = flatten_json_body(self.data, None, None, None, schema_version, false)?;
5357
let stream_schema = schema;
5458

5559
// incoming event may be a single json or a json array
@@ -68,43 +72,38 @@ impl EventFormat for Event {
6872
let mut is_first = false;
6973
let schema = match derive_arrow_schema(stream_schema, fields) {
7074
Ok(schema) => schema,
71-
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
72-
Ok(mut infer_schema) => {
73-
let new_infer_schema = super::super::format::update_field_type_in_schema(
74-
Arc::new(infer_schema),
75-
Some(stream_schema),
76-
time_partition,
77-
Some(&value_arr),
78-
);
79-
infer_schema = Schema::new(new_infer_schema.fields().clone());
80-
if let Err(err) = Schema::try_merge(vec![
81-
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
82-
infer_schema.clone(),
83-
]) {
84-
return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err));
85-
}
86-
is_first = true;
87-
infer_schema
88-
.fields
89-
.iter()
90-
.filter(|field| !field.data_type().is_null())
91-
.cloned()
92-
.sorted_by(|a, b| a.name().cmp(b.name()))
93-
.collect()
94-
}
95-
Err(err) => {
96-
return Err(anyhow!(
97-
"Could not infer schema for this event due to err {:?}",
98-
err
99-
))
100-
}
101-
},
75+
Err(_) => {
76+
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
77+
.map_err(|err| {
78+
anyhow!("Could not infer schema for this event due to err {:?}", err)
79+
})?;
80+
let new_infer_schema = super::update_field_type_in_schema(
81+
Arc::new(infer_schema),
82+
Some(stream_schema),
83+
time_partition,
84+
Some(&value_arr),
85+
schema_version,
86+
);
87+
infer_schema = Schema::new(new_infer_schema.fields().clone());
88+
Schema::try_merge(vec![
89+
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
90+
infer_schema.clone(),
91+
]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?;
92+
is_first = true;
93+
infer_schema
94+
.fields
95+
.iter()
96+
.filter(|field| !field.data_type().is_null())
97+
.cloned()
98+
.sorted_by(|a, b| a.name().cmp(b.name()))
99+
.collect()
100+
}
102101
};
103102

104103
if static_schema_flag.is_none()
105104
&& value_arr
106105
.iter()
107-
.any(|value| fields_mismatch(&schema, value))
106+
.any(|value| fields_mismatch(&schema, value, schema_version))
108107
{
109108
return Err(anyhow!(
110109
"Could not process this event due to mismatch in datatype"
@@ -165,27 +164,30 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
165164
Ok(keys)
166165
}
167166

168-
fn fields_mismatch(schema: &[Arc<Field>], body: &Value) -> bool {
167+
fn fields_mismatch(schema: &[Arc<Field>], body: &Value, schema_version: SchemaVersion) -> bool {
169168
for (name, val) in body.as_object().expect("body is of object variant") {
170169
if val.is_null() {
171170
continue;
172171
}
173172
let Some(field) = get_field(schema, name) else {
174173
return true;
175174
};
176-
if !valid_type(field.data_type(), val) {
175+
if !valid_type(field.data_type(), val, schema_version) {
177176
return true;
178177
}
179178
}
180179
false
181180
}
182181

183-
fn valid_type(data_type: &DataType, value: &Value) -> bool {
182+
fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion) -> bool {
184183
match data_type {
185184
DataType::Boolean => value.is_boolean(),
186185
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
187186
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
188-
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
187+
DataType::Float16 | DataType::Float32 => value.is_f64(),
188+
// All numbers can be cast as Float64 from schema version v1
189+
DataType::Float64 if schema_version == SchemaVersion::V1 => value.is_number(),
190+
DataType::Float64 if schema_version != SchemaVersion::V1 => value.is_f64(),
189191
DataType::Utf8 => value.is_string(),
190192
DataType::List(field) => {
191193
let data_type = field.data_type();
@@ -194,7 +196,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
194196
if elem.is_null() {
195197
continue;
196198
}
197-
if !valid_type(data_type, elem) {
199+
if !valid_type(data_type, elem, schema_version) {
198200
return false;
199201
}
200202
}
@@ -212,7 +214,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
212214
if value.is_null() {
213215
continue;
214216
}
215-
if !valid_type(field.data_type(), value) {
217+
if !valid_type(field.data_type(), value, schema_version) {
216218
return false;
217219
}
218220
} else {

0 commit comments

Comments
 (0)