Skip to content

Commit 702dbc8

Browse files
authored
Revert "fix: convert all number data types to float" (#1038)
This reverts commit 6d3632a.
1 parent fc44fca commit 702dbc8

File tree

6 files changed

+36
-91
lines changed

6 files changed

+36
-91
lines changed

src/catalog/column.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,6 @@ impl TypedStatistics {
6666
max: max(this.max, other.max),
6767
})
6868
}
69-
70-
// Ints are casted to Float if self is Float and other in Int
71-
(TypedStatistics::Float(this), TypedStatistics::Int(other)) => {
72-
TypedStatistics::Float(Float64Type {
73-
min: this.min.min(other.min as f64),
74-
max: this.max.max(other.max as f64),
75-
})
76-
}
7769
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
7870
TypedStatistics::Float(Float64Type {
7971
min: this.min.min(other.min),

src/event/format/json.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,9 +185,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
185185
DataType::Boolean => value.is_boolean(),
186186
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
187187
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
188-
DataType::Float16 | DataType::Float32 => value.is_f64(),
189-
// NOTE: All numbers can be ingested as Float64
190-
DataType::Float64 => value.is_number(),
188+
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
191189
DataType::Utf8 => value.is_string(),
192190
DataType::List(field) => {
193191
let data_type = field.data_type();

src/event/format/mod.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -204,24 +204,6 @@ pub fn override_timestamp_fields(
204204
Arc::new(Schema::new(updated_fields))
205205
}
206206

207-
/// All number fields from inferred schema are forced into Float64
208-
pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field>> {
209-
schema
210-
.iter()
211-
.map(|field| {
212-
if field.data_type().is_numeric() {
213-
Arc::new(Field::new(
214-
field.name(),
215-
DataType::Float64,
216-
field.is_nullable(),
217-
))
218-
} else {
219-
field.clone()
220-
}
221-
})
222-
.collect::<Vec<Arc<Field>>>()
223-
}
224-
225207
pub fn update_field_type_in_schema(
226208
inferred_schema: Arc<Schema>,
227209
existing_schema: Option<&HashMap<String, Arc<Field>>>,
@@ -230,10 +212,6 @@ pub fn update_field_type_in_schema(
230212
) -> Arc<Schema> {
231213
let mut updated_schema = inferred_schema.clone();
232214

233-
// All number fields from inferred schema are forced into Float64
234-
updated_schema = Arc::new(Schema::new(override_num_fields_from_schema(
235-
updated_schema.fields().to_vec(),
236-
)));
237215
if let Some(existing_schema) = existing_schema {
238216
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
239217
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);

src/handlers/http/ingest.rs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ mod tests {
273273
use std::{collections::HashMap, sync::Arc};
274274

275275
use actix_web::test::TestRequest;
276-
use arrow_array::{ArrayRef, Float64Array, StringArray};
276+
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
277277
use arrow_schema::{DataType, Field};
278278
use serde_json::json;
279279

@@ -283,11 +283,16 @@ mod tests {
283283
};
284284

285285
trait TestExt {
286+
fn as_int64_arr(&self) -> &Int64Array;
286287
fn as_float64_arr(&self) -> &Float64Array;
287288
fn as_utf8_arr(&self) -> &StringArray;
288289
}
289290

290291
impl TestExt for ArrayRef {
292+
fn as_int64_arr(&self) -> &Int64Array {
293+
self.as_any().downcast_ref().unwrap()
294+
}
295+
291296
fn as_float64_arr(&self) -> &Float64Array {
292297
self.as_any().downcast_ref().unwrap()
293298
}
@@ -319,8 +324,8 @@ mod tests {
319324
assert_eq!(rb.num_rows(), 1);
320325
assert_eq!(rb.num_columns(), 6);
321326
assert_eq!(
322-
rb.column_by_name("a").unwrap().as_float64_arr(),
323-
&Float64Array::from_iter([1.0])
327+
rb.column_by_name("a").unwrap().as_int64_arr(),
328+
&Int64Array::from_iter([1])
324329
);
325330
assert_eq!(
326331
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -359,8 +364,8 @@ mod tests {
359364
assert_eq!(rb.num_rows(), 1);
360365
assert_eq!(rb.num_columns(), 5);
361366
assert_eq!(
362-
rb.column_by_name("a").unwrap().as_float64_arr(),
363-
&Float64Array::from_iter([1.0])
367+
rb.column_by_name("a").unwrap().as_int64_arr(),
368+
&Int64Array::from_iter([1])
364369
);
365370
assert_eq!(
366371
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -377,7 +382,7 @@ mod tests {
377382

378383
let schema = fields_to_map(
379384
[
380-
Field::new("a", DataType::Float64, true),
385+
Field::new("a", DataType::Int64, true),
381386
Field::new("b", DataType::Utf8, true),
382387
Field::new("c", DataType::Float64, true),
383388
]
@@ -391,8 +396,8 @@ mod tests {
391396
assert_eq!(rb.num_rows(), 1);
392397
assert_eq!(rb.num_columns(), 5);
393398
assert_eq!(
394-
rb.column_by_name("a").unwrap().as_float64_arr(),
395-
&Float64Array::from_iter([1.0])
399+
rb.column_by_name("a").unwrap().as_int64_arr(),
400+
&Int64Array::from_iter([1])
396401
);
397402
assert_eq!(
398403
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -409,7 +414,7 @@ mod tests {
409414

410415
let schema = fields_to_map(
411416
[
412-
Field::new("a", DataType::Float64, true),
417+
Field::new("a", DataType::Int64, true),
413418
Field::new("b", DataType::Utf8, true),
414419
Field::new("c", DataType::Float64, true),
415420
]
@@ -479,21 +484,21 @@ mod tests {
479484
let schema = rb.schema();
480485
let fields = &schema.fields;
481486

482-
assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true));
487+
assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true));
483488
assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true));
484-
assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true));
489+
assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true));
485490

486491
assert_eq!(
487-
rb.column_by_name("a").unwrap().as_float64_arr(),
488-
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
492+
rb.column_by_name("a").unwrap().as_int64_arr(),
493+
&Int64Array::from(vec![None, Some(1), Some(1)])
489494
);
490495
assert_eq!(
491496
rb.column_by_name("b").unwrap().as_utf8_arr(),
492497
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
493498
);
494499
assert_eq!(
495-
rb.column_by_name("c").unwrap().as_float64_arr(),
496-
&Float64Array::from(vec![None, Some(1.0), None])
500+
rb.column_by_name("c").unwrap().as_int64_arr(),
501+
&Int64Array::from(vec![None, Some(1), None])
497502
);
498503
}
499504

@@ -524,8 +529,8 @@ mod tests {
524529
assert_eq!(rb.num_rows(), 3);
525530
assert_eq!(rb.num_columns(), 6);
526531
assert_eq!(
527-
rb.column_by_name("a").unwrap().as_float64_arr(),
528-
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
532+
rb.column_by_name("a").unwrap().as_int64_arr(),
533+
&Int64Array::from(vec![None, Some(1), Some(1)])
529534
);
530535
assert_eq!(
531536
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -559,7 +564,7 @@ mod tests {
559564

560565
let schema = fields_to_map(
561566
[
562-
Field::new("a", DataType::Float64, true),
567+
Field::new("a", DataType::Int64, true),
563568
Field::new("b", DataType::Utf8, true),
564569
Field::new("c", DataType::Float64, true),
565570
]
@@ -572,8 +577,8 @@ mod tests {
572577
assert_eq!(rb.num_rows(), 3);
573578
assert_eq!(rb.num_columns(), 6);
574579
assert_eq!(
575-
rb.column_by_name("a").unwrap().as_float64_arr(),
576-
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
580+
rb.column_by_name("a").unwrap().as_int64_arr(),
581+
&Int64Array::from(vec![None, Some(1), Some(1)])
577582
);
578583
assert_eq!(
579584
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -599,7 +604,7 @@ mod tests {
599604
"c": 1
600605
},
601606
{
602-
"a": "1",
607+
"a": 1,
603608
"b": "hello",
604609
"c": null
605610
},
@@ -609,7 +614,7 @@ mod tests {
609614

610615
let schema = fields_to_map(
611616
[
612-
Field::new("a", DataType::Float64, true),
617+
Field::new("a", DataType::Int64, true),
613618
Field::new("b", DataType::Utf8, true),
614619
Field::new("c", DataType::Float64, true),
615620
]
@@ -649,8 +654,8 @@ mod tests {
649654
assert_eq!(rb.num_rows(), 4);
650655
assert_eq!(rb.num_columns(), 7);
651656
assert_eq!(
652-
rb.column_by_name("a").unwrap().as_float64_arr(),
653-
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
657+
rb.column_by_name("a").unwrap().as_int64_arr(),
658+
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
654659
);
655660
assert_eq!(
656661
rb.column_by_name("b").unwrap().as_utf8_arr(),
@@ -663,13 +668,13 @@ mod tests {
663668
);
664669

665670
assert_eq!(
666-
rb.column_by_name("c_a").unwrap().as_float64_arr(),
667-
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0)])
671+
rb.column_by_name("c_a").unwrap().as_int64_arr(),
672+
&Int64Array::from(vec![None, None, Some(1), Some(1)])
668673
);
669674

670675
assert_eq!(
671-
rb.column_by_name("c_b").unwrap().as_float64_arr(),
672-
&Float64Array::from(vec![None, None, None, Some(2.0)])
676+
rb.column_by_name("c_b").unwrap().as_int64_arr(),
677+
&Int64Array::from(vec![None, None, None, Some(2)])
673678
);
674679
}
675680
}

src/metadata.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,6 @@ impl StreamInfo {
164164
Ok(Arc::new(schema))
165165
}
166166

167-
/// update the schema in the metadata
168-
pub fn set_schema(
169-
&self,
170-
stream_name: &str,
171-
schema: HashMap<String, Arc<Field>>,
172-
) -> Result<(), MetadataError> {
173-
let mut map = self.write().expect(LOCK_EXPECT);
174-
map.get_mut(stream_name)
175-
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
176-
.map(|metadata| {
177-
metadata.schema = schema;
178-
})
179-
}
180-
181167
pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
182168
let mut map = self.write().expect(LOCK_EXPECT);
183169
map.get_mut(stream_name)

src/storage/object_storage.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use super::{
2525
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2626
};
2727

28-
use crate::event::format::override_num_fields_from_schema;
2928
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
3029
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
3130
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
@@ -40,7 +39,7 @@ use crate::{
4039
};
4140

4241
use actix_web_prometheus::PrometheusMetrics;
43-
use arrow_schema::{Field, Schema};
42+
use arrow_schema::Schema;
4443
use async_trait::async_trait;
4544
use bytes::Bytes;
4645
use chrono::Local;
@@ -633,21 +632,8 @@ pub async fn commit_schema_to_storage(
633632
schema: Schema,
634633
) -> Result<(), ObjectStorageError> {
635634
let storage = CONFIG.storage().get_object_store();
636-
let mut stream_schema = storage.get_schema(stream_name).await?;
637-
// override the data type of all numeric fields to Float64
638-
//if data type is not Float64 already
639-
stream_schema = Schema::new(override_num_fields_from_schema(
640-
stream_schema.fields().iter().cloned().collect(),
641-
));
635+
let stream_schema = storage.get_schema(stream_name).await?;
642636
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
643-
644-
//update the merged schema in the metadata and storage
645-
let schema_map: HashMap<String, Arc<Field>> = new_schema
646-
.fields()
647-
.iter()
648-
.map(|field| (field.name().clone(), Arc::clone(field)))
649-
.collect();
650-
let _ = STREAM_INFO.set_schema(stream_name, schema_map);
651637
storage.put_schema(stream_name, &new_schema).await
652638
}
653639

0 commit comments

Comments
 (0)