Skip to content

Commit 6d3632a

Browse files
nikhilsinhaparseablenitishtde-sh
authored
fix: convert all number data types to float (#1027)
server checks if event has any number (all ints and floats) then update the data type of all numbers to Float64 This is useful to allow users to dynamically switch between an int or a float in their events. --------- Signed-off-by: Nitish Tiwari <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]> Co-authored-by: Devdutt Shenoi <[email protected]>
1 parent f6ac00e commit 6d3632a

File tree

6 files changed

+91
-36
lines changed

6 files changed

+91
-36
lines changed

src/catalog/column.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ impl TypedStatistics {
6666
max: max(this.max, other.max),
6767
})
6868
}
69+
70+
// Ints are casted to Float if self is Float and other in Int
71+
(TypedStatistics::Float(this), TypedStatistics::Int(other)) => {
72+
TypedStatistics::Float(Float64Type {
73+
min: this.min.min(other.min as f64),
74+
max: this.max.max(other.max as f64),
75+
})
76+
}
6977
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
7078
TypedStatistics::Float(Float64Type {
7179
min: this.min.min(other.min),

src/event/format/json.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
185185
DataType::Boolean => value.is_boolean(),
186186
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
187187
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
188-
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
188+
DataType::Float16 | DataType::Float32 => value.is_f64(),
189+
// NOTE: All numbers can be ingested as Float64
190+
DataType::Float64 => value.is_number(),
189191
DataType::Utf8 => value.is_string(),
190192
DataType::List(field) => {
191193
let data_type = field.data_type();

src/event/format/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,24 @@ pub fn override_timestamp_fields(
204204
Arc::new(Schema::new(updated_fields))
205205
}
206206

207+
/// All number fields from inferred schema are forced into Float64
208+
pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field>> {
209+
schema
210+
.iter()
211+
.map(|field| {
212+
if field.data_type().is_numeric() {
213+
Arc::new(Field::new(
214+
field.name(),
215+
DataType::Float64,
216+
field.is_nullable(),
217+
))
218+
} else {
219+
field.clone()
220+
}
221+
})
222+
.collect::<Vec<Arc<Field>>>()
223+
}
224+
207225
pub fn update_field_type_in_schema(
208226
inferred_schema: Arc<Schema>,
209227
existing_schema: Option<&HashMap<String, Arc<Field>>>,
@@ -212,6 +230,10 @@ pub fn update_field_type_in_schema(
212230
) -> Arc<Schema> {
213231
let mut updated_schema = inferred_schema.clone();
214232

233+
// All number fields from inferred schema are forced into Float64
234+
updated_schema = Arc::new(Schema::new(override_num_fields_from_schema(
235+
updated_schema.fields().to_vec(),
236+
)));
215237
if let Some(existing_schema) = existing_schema {
216238
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
217239
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);

src/handlers/http/ingest.rs

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ mod tests {
277277
use std::{collections::HashMap, sync::Arc};
278278

279279
use actix_web::test::TestRequest;
280-
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
280+
use arrow_array::{ArrayRef, Float64Array, StringArray};
281281
use arrow_schema::{DataType, Field};
282282
use serde_json::json;
283283

@@ -287,16 +287,11 @@ mod tests {
287287
};
288288

289289
trait TestExt {
290-
fn as_int64_arr(&self) -> &Int64Array;
291290
fn as_float64_arr(&self) -> &Float64Array;
292291
fn as_utf8_arr(&self) -> &StringArray;
293292
}
294293

295294
impl TestExt for ArrayRef {
296-
fn as_int64_arr(&self) -> &Int64Array {
297-
self.as_any().downcast_ref().unwrap()
298-
}
299-
300295
fn as_float64_arr(&self) -> &Float64Array {
301296
self.as_any().downcast_ref().unwrap()
302297
}
@@ -328,8 +323,8 @@ mod tests {
328323
assert_eq!(rb.num_rows(), 1);
329324
assert_eq!(rb.num_columns(), 6);
330325
assert_eq!(
331-
rb.column_by_name("a").unwrap().as_int64_arr(),
332-
&Int64Array::from_iter([1])
326+
rb.column_by_name("a").unwrap().as_float64_arr(),
327+
&Float64Array::from_iter([1.0])
333328
);
334329
assert_eq!(
335330
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -368,8 +363,8 @@ mod tests {
368363
assert_eq!(rb.num_rows(), 1);
369364
assert_eq!(rb.num_columns(), 5);
370365
assert_eq!(
371-
rb.column_by_name("a").unwrap().as_int64_arr(),
372-
&Int64Array::from_iter([1])
366+
rb.column_by_name("a").unwrap().as_float64_arr(),
367+
&Float64Array::from_iter([1.0])
373368
);
374369
assert_eq!(
375370
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -386,7 +381,7 @@ mod tests {
386381

387382
let schema = fields_to_map(
388383
[
389-
Field::new("a", DataType::Int64, true),
384+
Field::new("a", DataType::Float64, true),
390385
Field::new("b", DataType::Utf8, true),
391386
Field::new("c", DataType::Float64, true),
392387
]
@@ -400,8 +395,8 @@ mod tests {
400395
assert_eq!(rb.num_rows(), 1);
401396
assert_eq!(rb.num_columns(), 5);
402397
assert_eq!(
403-
rb.column_by_name("a").unwrap().as_int64_arr(),
404-
&Int64Array::from_iter([1])
398+
rb.column_by_name("a").unwrap().as_float64_arr(),
399+
&Float64Array::from_iter([1.0])
405400
);
406401
assert_eq!(
407402
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -418,7 +413,7 @@ mod tests {
418413

419414
let schema = fields_to_map(
420415
[
421-
Field::new("a", DataType::Int64, true),
416+
Field::new("a", DataType::Float64, true),
422417
Field::new("b", DataType::Utf8, true),
423418
Field::new("c", DataType::Float64, true),
424419
]
@@ -488,21 +483,21 @@ mod tests {
488483
let schema = rb.schema();
489484
let fields = &schema.fields;
490485

491-
assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true));
486+
assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true));
492487
assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true));
493-
assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true));
488+
assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true));
494489

495490
assert_eq!(
496-
rb.column_by_name("a").unwrap().as_int64_arr(),
497-
&Int64Array::from(vec![None, Some(1), Some(1)])
491+
rb.column_by_name("a").unwrap().as_float64_arr(),
492+
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
498493
);
499494
assert_eq!(
500495
rb.column_by_name("b").unwrap().as_utf8_arr(),
501496
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
502497
);
503498
assert_eq!(
504-
rb.column_by_name("c").unwrap().as_int64_arr(),
505-
&Int64Array::from(vec![None, Some(1), None])
499+
rb.column_by_name("c").unwrap().as_float64_arr(),
500+
&Float64Array::from(vec![None, Some(1.0), None])
506501
);
507502
}
508503

@@ -533,8 +528,8 @@ mod tests {
533528
assert_eq!(rb.num_rows(), 3);
534529
assert_eq!(rb.num_columns(), 6);
535530
assert_eq!(
536-
rb.column_by_name("a").unwrap().as_int64_arr(),
537-
&Int64Array::from(vec![None, Some(1), Some(1)])
531+
rb.column_by_name("a").unwrap().as_float64_arr(),
532+
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
538533
);
539534
assert_eq!(
540535
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -568,7 +563,7 @@ mod tests {
568563

569564
let schema = fields_to_map(
570565
[
571-
Field::new("a", DataType::Int64, true),
566+
Field::new("a", DataType::Float64, true),
572567
Field::new("b", DataType::Utf8, true),
573568
Field::new("c", DataType::Float64, true),
574569
]
@@ -581,8 +576,8 @@ mod tests {
581576
assert_eq!(rb.num_rows(), 3);
582577
assert_eq!(rb.num_columns(), 6);
583578
assert_eq!(
584-
rb.column_by_name("a").unwrap().as_int64_arr(),
585-
&Int64Array::from(vec![None, Some(1), Some(1)])
579+
rb.column_by_name("a").unwrap().as_float64_arr(),
580+
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
586581
);
587582
assert_eq!(
588583
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -608,7 +603,7 @@ mod tests {
608603
"c": 1
609604
},
610605
{
611-
"a": 1,
606+
"a": "1",
612607
"b": "hello",
613608
"c": null
614609
},
@@ -618,7 +613,7 @@ mod tests {
618613

619614
let schema = fields_to_map(
620615
[
621-
Field::new("a", DataType::Int64, true),
616+
Field::new("a", DataType::Float64, true),
622617
Field::new("b", DataType::Utf8, true),
623618
Field::new("c", DataType::Float64, true),
624619
]
@@ -658,8 +653,8 @@ mod tests {
658653
assert_eq!(rb.num_rows(), 4);
659654
assert_eq!(rb.num_columns(), 7);
660655
assert_eq!(
661-
rb.column_by_name("a").unwrap().as_int64_arr(),
662-
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
656+
rb.column_by_name("a").unwrap().as_float64_arr(),
657+
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
663658
);
664659
assert_eq!(
665660
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -672,13 +667,13 @@ mod tests {
672667
);
673668

674669
assert_eq!(
675-
rb.column_by_name("c_a").unwrap().as_int64_arr(),
676-
&Int64Array::from(vec![None, None, Some(1), Some(1)])
670+
rb.column_by_name("c_a").unwrap().as_float64_arr(),
671+
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0)])
677672
);
678673

679674
assert_eq!(
680-
rb.column_by_name("c_b").unwrap().as_int64_arr(),
681-
&Int64Array::from(vec![None, None, None, Some(2)])
675+
rb.column_by_name("c_b").unwrap().as_float64_arr(),
676+
&Float64Array::from(vec![None, None, None, Some(2.0)])
682677
);
683678
}
684679
}

src/metadata.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,20 @@ impl StreamInfo {
164164
Ok(Arc::new(schema))
165165
}
166166

167+
/// update the schema in the metadata
168+
pub fn set_schema(
169+
&self,
170+
stream_name: &str,
171+
schema: HashMap<String, Arc<Field>>,
172+
) -> Result<(), MetadataError> {
173+
let mut map = self.write().expect(LOCK_EXPECT);
174+
map.get_mut(stream_name)
175+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
176+
.map(|metadata| {
177+
metadata.schema = schema;
178+
})
179+
}
180+
167181
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
168182
let mut map = self.write().expect(LOCK_EXPECT);
169183
map.get_mut(stream_name)

src/storage/object_storage.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use super::{
2525
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

28+
use crate::event::format::override_num_fields_from_schema;
2829
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
2930
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
3031
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
@@ -40,7 +41,7 @@ use crate::{
4041
};
4142

4243
use actix_web_prometheus::PrometheusMetrics;
43-
use arrow_schema::Schema;
44+
use arrow_schema::{Field, Schema};
4445
use async_trait::async_trait;
4546
use bytes::Bytes;
4647
use chrono::Local;
@@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage(
667668
schema: Schema,
668669
) -> Result<(), ObjectStorageError> {
669670
let storage = CONFIG.storage().get_object_store();
670-
let stream_schema = storage.get_schema(stream_name).await?;
671+
let mut stream_schema = storage.get_schema(stream_name).await?;
672+
// override the data type of all numeric fields to Float64
673+
//if data type is not Float64 already
674+
stream_schema = Schema::new(override_num_fields_from_schema(
675+
stream_schema.fields().iter().cloned().collect(),
676+
));
671677
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
678+
679+
//update the merged schema in the metadata and storage
680+
let schema_map: HashMap<String, Arc<Field>> = new_schema
681+
.fields()
682+
.iter()
683+
.map(|field| (field.name().clone(), Arc::clone(field)))
684+
.collect();
685+
let _ = STREAM_INFO.set_schema(stream_name, schema_map);
672686
storage.put_schema(stream_name, &new_schema).await
673687
}
674688

0 commit comments

Comments
 (0)