From 5909bfd9bb45c6e66247b95fd847b9547bc4e02c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Thu, 6 Apr 2023 23:14:39 +0800 Subject: [PATCH 1/2] feat: ignore timezone info when copy from external files --- Cargo.lock | 1 + src/datanode/src/error.rs | 4 +- src/datanode/src/sql/copy_table_from.rs | 146 ++++++++++++++++++++++-- src/datatypes/Cargo.toml | 1 + src/datatypes/src/vectors/helper.rs | 22 ++-- src/datatypes/src/vectors/primitive.rs | 45 ++++++++ 6 files changed, 200 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d0667b95f4d..cb58cbd24231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2441,6 +2441,7 @@ name = "datatypes" version = "0.1.1" dependencies = [ "arrow", + "arrow-array", "arrow-schema", "common-base", "common-error", diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6e627793def9..276e78526184 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -423,11 +423,13 @@ pub enum Error { }, #[snafu(display( - "File Schema mismatch, expected table schema: {} but found :{}", + "File schema mismatch at index {}, expected table schema: {} but found :{}", + index, table_schema, file_schema ))] InvalidSchema { + index: usize, table_schema: String, file_schema: String, }, diff --git a/src/datanode/src/sql/copy_table_from.rs b/src/datanode/src/sql/copy_table_from.rs index 9b79ad3d7160..42aa3d25bb49 100644 --- a/src/datanode/src/sql/copy_table_from.rs +++ b/src/datanode/src/sql/copy_table_from.rs @@ -23,10 +23,12 @@ use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::error::DataTypesSnafu; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; +use datatypes::arrow::datatypes::{DataType, SchemaRef}; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::Helper; use futures_util::StreamExt; use regex::Regex; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use table::engine::TableReference; use table::requests::{CopyTableRequest, InsertRequest}; use tokio::io::BufReader; @@ -88,13 +90,7 @@ impl SqlHandler { .await .context(error::ReadParquetSnafu)?; - ensure!( - builder.schema() == table.schema().arrow_schema(), - error::InvalidSchemaSnafu { - table_schema: table.schema().arrow_schema().to_string(), - file_schema: (*(builder.schema())).to_string() - } - ); + ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?; let mut stream = builder .build() @@ -159,3 +155,137 @@ async fn batch_insert( *pending_bytes = 0; Ok(res) } + +fn ensure_schema_matches_ignore_timezone(left: &SchemaRef, right: &SchemaRef) -> Result<()> { + let not_match = left + .fields + .iter() + .zip(right.fields.iter()) + .map(|(l, r)| (l.data_type(), r.data_type())) + .enumerate() + .find(|(_, (l, r))| !data_type_equals_ignore_timezone(l, r)); + + if let Some((index, _)) = not_match { + error::InvalidSchemaSnafu { + index, + table_schema: left.to_string(), + file_schema: right.to_string(), + } + .fail() + } else { + Ok(()) + } +} + +fn data_type_equals_ignore_timezone(l: &DataType, r: &DataType) -> bool { + match (l, r) { + (DataType::List(a), DataType::List(b)) + | (DataType::LargeList(a), DataType::LargeList(b)) => { + a.is_nullable() == b.is_nullable() + && data_type_equals_ignore_timezone(a.data_type(), b.data_type()) + } + (DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => { + a_size == b_size + && a.is_nullable() == b.is_nullable() + && data_type_equals_ignore_timezone(a.data_type(), b.data_type()) + } + (DataType::Struct(a), DataType::Struct(b)) => { + a.len() == b.len() + && a.iter().zip(b).all(|(a, b)| { + a.is_nullable() == b.is_nullable() + && data_type_equals_ignore_timezone(a.data_type(), b.data_type()) + }) + } + (DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => { + a_field == b_field && a_is_sorted == b_is_sorted + } + (DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => l_unit == r_unit, + _ => l == r, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datatypes::arrow::datatypes::{Field, Schema}; + + use super::*; + + fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) { + let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)])); + let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)])); + let res = ensure_schema_matches_ignore_timezone(&s1, &s2); + assert_eq!(matches, res.is_ok()) + } + + #[test] + fn test_ensure_datatype_matches_ignore_timezone() { + test_schema_matches( + ( + DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None), + true, + ), + ( + DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None), + true, + ), + true, + ); + + test_schema_matches( + ( + DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Second, + Some("UTC".to_string()), + ), + true, + ), + ( + DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None), + true, + ), + true, + ); + + test_schema_matches( + ( + DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Second, + Some("UTC".to_string()), + ), + true, + ), + ( + DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Second, + Some("PDT".to_string()), + ), + true, + ), + true, + ); + + test_schema_matches( + ( + DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Second, + Some("UTC".to_string()), + ), + true, + ), + ( + DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Millisecond, + Some("UTC".to_string()), + ), + true, + ), + false, + ); + + test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true); + + test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false); + } +} diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 0d0158a3b0ab..dd85c58ce35a 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -10,6 +10,7 @@ test = [] [dependencies] arrow.workspace = true +arrow-array = "36" arrow-schema.workspace = true common-base = { path = "../common/base" } common-error = { path = "../common/error" } diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index caaf1eeef39c..d2c903c9c38e 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -238,16 +238,18 @@ impl Helper { ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?), ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?), ArrowDataType::Timestamp(unit, _) => match unit { - TimeUnit::Second => Arc::new(TimestampSecondVector::try_from_arrow_array(array)?), - TimeUnit::Millisecond => { - Arc::new(TimestampMillisecondVector::try_from_arrow_array(array)?) - } - TimeUnit::Microsecond => { - Arc::new(TimestampMicrosecondVector::try_from_arrow_array(array)?) - } - TimeUnit::Nanosecond => { - Arc::new(TimestampNanosecondVector::try_from_arrow_array(array)?) - } + TimeUnit::Second => Arc::new( + TimestampSecondVector::try_from_arrow_timestamp_array(array)?, + ), + TimeUnit::Millisecond => Arc::new( + TimestampMillisecondVector::try_from_arrow_timestamp_array(array)?, + ), + TimeUnit::Microsecond => Arc::new( + TimestampMicrosecondVector::try_from_arrow_timestamp_array(array)?, + ), + TimeUnit::Nanosecond => Arc::new( + TimestampNanosecondVector::try_from_arrow_timestamp_array(array)?, + ), }, ArrowDataType::Float16 | ArrowDataType::Time32(_) diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 219e5ae34b24..bf94e74bb0d6 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -18,7 +18,10 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayBuilder, ArrayData, ArrayIter, ArrayRef, PrimitiveArray, PrimitiveBuilder, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }; +use arrow_schema::DataType; use serde_json::Value as JsonValue; use snafu::OptionExt; @@ -70,6 +73,48 @@ impl PrimitiveVector { Ok(Self::new(concrete_array)) } + /// Converts arrow timestamp array to vectors, ignoring time zone info. + pub fn try_from_arrow_timestamp_array(array: impl AsRef) -> Result { + let array = array.as_ref(); + let array_data = match array.data_type() { + DataType::Timestamp(unit, _) => match unit { + arrow_schema::TimeUnit::Second => array + .as_any() + .downcast_ref::() + .unwrap() + .with_timezone_opt(None) + .data() + .clone(), + arrow_schema::TimeUnit::Millisecond => array + .as_any() + .downcast_ref::() + .unwrap() + .with_timezone_opt(None) + .data() + .clone(), + arrow_schema::TimeUnit::Microsecond => array + .as_any() + .downcast_ref::() + .unwrap() + .with_timezone_opt(None) + .data() + .clone(), + arrow_schema::TimeUnit::Nanosecond => array + .as_any() + .downcast_ref::() + .unwrap() + .with_timezone_opt(None) + .data() + .clone(), + }, + _ => { + unreachable!() + } + }; + let concrete_array = PrimitiveArray::::from(array_data); + Ok(Self::new(concrete_array)) + } + pub fn from_slice>(slice: P) -> Self { let iter = slice.as_ref().iter().copied(); Self { From acb839c6ffa5200fff4774d86e49c52cfeabfa46 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 10 Apr 2023 11:16:13 +0800 Subject: [PATCH 2/2] chore: rebase onto develop --- src/datanode/src/error.rs | 2 +- src/datanode/src/sql/copy_table_from.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 276e78526184..3deebad3a75c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -423,7 +423,7 @@ pub enum Error { }, #[snafu(display( - "File schema mismatch at index {}, expected table schema: {} but found :{}", + "File schema mismatch at index {}, expected table schema: {} but found: {}", index, table_schema, file_schema diff --git a/src/datanode/src/sql/copy_table_from.rs b/src/datanode/src/sql/copy_table_from.rs index 42aa3d25bb49..dc22c6523362 100644 --- a/src/datanode/src/sql/copy_table_from.rs +++ b/src/datanode/src/sql/copy_table_from.rs @@ -24,7 +24,6 @@ use common_query::Output; use common_recordbatch::error::DataTypesSnafu; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datatypes::arrow::datatypes::{DataType, SchemaRef}; -use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::Helper; use futures_util::StreamExt; use regex::Regex;