Skip to content

Commit 3a4973e

Browse files
committed
feat: ignore timezone info when copy from external files
1 parent 61e3fca commit 3a4973e

File tree

6 files changed

+199
-19
lines changed

6 files changed

+199
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/datanode/src/error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,11 +418,13 @@ pub enum Error {
418418
},
419419

420420
#[snafu(display(
421-
"File Schema mismatch, expected table schema: {} but found :{}",
421+
"File schema mismatch at index {}, expected table schema: {} but found :{}",
422+
index,
422423
table_schema,
423424
file_schema
424425
))]
425426
InvalidSchema {
427+
index: usize,
426428
table_schema: String,
427429
file_schema: String,
428430
},

src/datanode/src/sql/copy_table_from.rs

Lines changed: 137 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ use common_datasource::util::find_dir_and_filename;
2121
use common_query::Output;
2222
use common_recordbatch::error::DataTypesSnafu;
2323
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
24+
use datatypes::arrow::datatypes::{DataType, SchemaRef};
2425
use datatypes::arrow::record_batch::RecordBatch;
2526
use datatypes::vectors::{Helper, VectorRef};
2627
use futures_util::TryStreamExt;
2728
use regex::Regex;
28-
use snafu::{ensure, ResultExt};
29+
use snafu::ResultExt;
2930
use table::engine::TableReference;
3031
use table::requests::{CopyTableRequest, InsertRequest};
3132
use tokio::io::BufReader;
@@ -80,13 +81,7 @@ impl SqlHandler {
8081
.await
8182
.context(error::ReadParquetSnafu)?;
8283

83-
ensure!(
84-
builder.schema() == table.schema().arrow_schema(),
85-
error::InvalidSchemaSnafu {
86-
table_schema: table.schema().arrow_schema().to_string(),
87-
file_schema: (*(builder.schema())).to_string()
88-
}
89-
);
84+
ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?;
9085

9186
let stream = builder
9287
.build()
@@ -143,3 +138,137 @@ impl SqlHandler {
143138
Ok(Output::AffectedRows(result.iter().sum()))
144139
}
145140
}
141+
142+
fn ensure_schema_matches_ignore_timezone(left: &SchemaRef, right: &SchemaRef) -> Result<()> {
143+
let not_match = left
144+
.fields
145+
.iter()
146+
.zip(right.fields.iter())
147+
.map(|(l, r)| (l.data_type(), r.data_type()))
148+
.enumerate()
149+
.find(|(_, (l, r))| !data_type_equals_ignore_timezone(l, r));
150+
151+
if let Some((index, _)) = not_match {
152+
error::InvalidSchemaSnafu {
153+
index,
154+
table_schema: left.to_string(),
155+
file_schema: right.to_string(),
156+
}
157+
.fail()
158+
} else {
159+
Ok(())
160+
}
161+
}
162+
163+
fn data_type_equals_ignore_timezone(l: &DataType, r: &DataType) -> bool {
164+
match (l, r) {
165+
(DataType::List(a), DataType::List(b))
166+
| (DataType::LargeList(a), DataType::LargeList(b)) => {
167+
a.is_nullable() == b.is_nullable()
168+
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
169+
}
170+
(DataType::FixedSizeList(a, a_size), DataType::FixedSizeList(b, b_size)) => {
171+
a_size == b_size
172+
&& a.is_nullable() == b.is_nullable()
173+
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
174+
}
175+
(DataType::Struct(a), DataType::Struct(b)) => {
176+
a.len() == b.len()
177+
&& a.iter().zip(b).all(|(a, b)| {
178+
a.is_nullable() == b.is_nullable()
179+
&& data_type_equals_ignore_timezone(a.data_type(), b.data_type())
180+
})
181+
}
182+
(DataType::Map(a_field, a_is_sorted), DataType::Map(b_field, b_is_sorted)) => {
183+
a_field == b_field && a_is_sorted == b_is_sorted
184+
}
185+
(DataType::Timestamp(l_unit, _), DataType::Timestamp(r_unit, _)) => l_unit == r_unit,
186+
_ => l == r,
187+
}
188+
}
189+
190+
#[cfg(test)]
191+
mod tests {
192+
use std::sync::Arc;
193+
194+
use datatypes::arrow::datatypes::{Field, Schema};
195+
196+
use super::*;
197+
198+
fn test_schema_matches(l: (DataType, bool), r: (DataType, bool), matches: bool) {
199+
let s1 = Arc::new(Schema::new(vec![Field::new("col", l.0, l.1)]));
200+
let s2 = Arc::new(Schema::new(vec![Field::new("col", r.0, r.1)]));
201+
let res = ensure_schema_matches_ignore_timezone(&s1, &s2);
202+
assert_eq!(matches, res.is_ok())
203+
}
204+
205+
#[test]
206+
fn test_ensure_datatype_matches_ignore_timezone() {
207+
test_schema_matches(
208+
(
209+
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
210+
true,
211+
),
212+
(
213+
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
214+
true,
215+
),
216+
true,
217+
);
218+
219+
test_schema_matches(
220+
(
221+
DataType::Timestamp(
222+
datatypes::arrow::datatypes::TimeUnit::Second,
223+
Some("UTC".to_string()),
224+
),
225+
true,
226+
),
227+
(
228+
DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, None),
229+
true,
230+
),
231+
true,
232+
);
233+
234+
test_schema_matches(
235+
(
236+
DataType::Timestamp(
237+
datatypes::arrow::datatypes::TimeUnit::Second,
238+
Some("UTC".to_string()),
239+
),
240+
true,
241+
),
242+
(
243+
DataType::Timestamp(
244+
datatypes::arrow::datatypes::TimeUnit::Second,
245+
Some("PDT".to_string()),
246+
),
247+
true,
248+
),
249+
true,
250+
);
251+
252+
test_schema_matches(
253+
(
254+
DataType::Timestamp(
255+
datatypes::arrow::datatypes::TimeUnit::Second,
256+
Some("UTC".to_string()),
257+
),
258+
true,
259+
),
260+
(
261+
DataType::Timestamp(
262+
datatypes::arrow::datatypes::TimeUnit::Millisecond,
263+
Some("UTC".to_string()),
264+
),
265+
true,
266+
),
267+
false,
268+
);
269+
270+
test_schema_matches((DataType::Int8, true), (DataType::Int8, true), true);
271+
272+
test_schema_matches((DataType::Int8, true), (DataType::Int16, true), false);
273+
}
274+
}

src/datatypes/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ test = []
1010

1111
[dependencies]
1212
arrow.workspace = true
13+
arrow-array = "36"
1314
arrow-schema.workspace = true
1415
common-base = { path = "../common/base" }
1516
common-error = { path = "../common/error" }

src/datatypes/src/vectors/helper.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,16 +238,18 @@ impl Helper {
238238
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
239239
ArrowDataType::List(_) => Arc::new(ListVector::try_from_arrow_array(array)?),
240240
ArrowDataType::Timestamp(unit, _) => match unit {
241-
TimeUnit::Second => Arc::new(TimestampSecondVector::try_from_arrow_array(array)?),
242-
TimeUnit::Millisecond => {
243-
Arc::new(TimestampMillisecondVector::try_from_arrow_array(array)?)
244-
}
245-
TimeUnit::Microsecond => {
246-
Arc::new(TimestampMicrosecondVector::try_from_arrow_array(array)?)
247-
}
248-
TimeUnit::Nanosecond => {
249-
Arc::new(TimestampNanosecondVector::try_from_arrow_array(array)?)
250-
}
241+
TimeUnit::Second => Arc::new(
242+
TimestampSecondVector::try_from_arrow_timestamp_array(array)?,
243+
),
244+
TimeUnit::Millisecond => Arc::new(
245+
TimestampMillisecondVector::try_from_arrow_timestamp_array(array)?,
246+
),
247+
TimeUnit::Microsecond => Arc::new(
248+
TimestampMicrosecondVector::try_from_arrow_timestamp_array(array)?,
249+
),
250+
TimeUnit::Nanosecond => Arc::new(
251+
TimestampNanosecondVector::try_from_arrow_timestamp_array(array)?,
252+
),
251253
},
252254
ArrowDataType::Float16
253255
| ArrowDataType::Time32(_)

src/datatypes/src/vectors/primitive.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use std::sync::Arc;
1818

1919
use arrow::array::{
2020
Array, ArrayBuilder, ArrayData, ArrayIter, ArrayRef, PrimitiveArray, PrimitiveBuilder,
21+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
22+
TimestampSecondArray,
2123
};
24+
use arrow_schema::DataType;
2225
use serde_json::Value as JsonValue;
2326
use snafu::OptionExt;
2427

@@ -70,6 +73,48 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
7073
Ok(Self::new(concrete_array))
7174
}
7275

76+
/// Converts arrow timestamp array to vectors, ignoring time zone info.
77+
pub fn try_from_arrow_timestamp_array(array: impl AsRef<dyn Array>) -> Result<Self> {
78+
let array = array.as_ref();
79+
let array_data = match array.data_type() {
80+
DataType::Timestamp(unit, _) => match unit {
81+
arrow_schema::TimeUnit::Second => array
82+
.as_any()
83+
.downcast_ref::<TimestampSecondArray>()
84+
.unwrap()
85+
.with_timezone_opt(None)
86+
.data()
87+
.clone(),
88+
arrow_schema::TimeUnit::Millisecond => array
89+
.as_any()
90+
.downcast_ref::<TimestampMillisecondArray>()
91+
.unwrap()
92+
.with_timezone_opt(None)
93+
.data()
94+
.clone(),
95+
arrow_schema::TimeUnit::Microsecond => array
96+
.as_any()
97+
.downcast_ref::<TimestampMicrosecondArray>()
98+
.unwrap()
99+
.with_timezone_opt(None)
100+
.data()
101+
.clone(),
102+
arrow_schema::TimeUnit::Nanosecond => array
103+
.as_any()
104+
.downcast_ref::<TimestampNanosecondArray>()
105+
.unwrap()
106+
.with_timezone_opt(None)
107+
.data()
108+
.clone(),
109+
},
110+
_ => {
111+
unreachable!()
112+
}
113+
};
114+
let concrete_array = PrimitiveArray::<T::ArrowPrimitive>::from(array_data);
115+
Ok(Self::new(concrete_array))
116+
}
117+
73118
pub fn from_slice<P: AsRef<[T::Native]>>(slice: P) -> Self {
74119
let iter = slice.as_ref().iter().copied();
75120
Self {

0 commit comments

Comments
 (0)