Skip to content

Commit 261b500

Browse files
committed
[task #9511] move date_part, date_trunc, date_bin functions to datafusion-functions
Signed-off-by: tangruilin <[email protected]>
1 parent b7f4772 commit 261b500

File tree

14 files changed

+291
-228
lines changed

14 files changed

+291
-228
lines changed

datafusion-cli/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/expr/src/built_in_function.rs

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,6 @@ pub enum BuiltinScalarFunction {
186186
Concat,
187187
/// concat_ws
188188
ConcatWithSeparator,
189-
/// date_part
190-
DatePart,
191189
/// date_trunc
192190
DateTrunc,
193191
/// date_bin
@@ -391,7 +389,6 @@ impl BuiltinScalarFunction {
391389
BuiltinScalarFunction::Chr => Volatility::Immutable,
392390
BuiltinScalarFunction::Concat => Volatility::Immutable,
393391
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
394-
BuiltinScalarFunction::DatePart => Volatility::Immutable,
395392
BuiltinScalarFunction::DateTrunc => Volatility::Immutable,
396393
BuiltinScalarFunction::DateBin => Volatility::Immutable,
397394
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
@@ -617,7 +614,6 @@ impl BuiltinScalarFunction {
617614
}
618615
BuiltinScalarFunction::Concat => Ok(Utf8),
619616
BuiltinScalarFunction::ConcatWithSeparator => Ok(Utf8),
620-
BuiltinScalarFunction::DatePart => Ok(Float64),
621617
BuiltinScalarFunction::DateBin | BuiltinScalarFunction::DateTrunc => {
622618
match &input_expr_types[1] {
623619
Timestamp(Nanosecond, None) | Utf8 | Null => {
@@ -1052,33 +1048,6 @@ impl BuiltinScalarFunction {
10521048

10531049
Signature::one_of(full_sig, self.volatility())
10541050
}
1055-
BuiltinScalarFunction::DatePart => Signature::one_of(
1056-
vec![
1057-
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
1058-
Exact(vec![
1059-
Utf8,
1060-
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
1061-
]),
1062-
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
1063-
Exact(vec![
1064-
Utf8,
1065-
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
1066-
]),
1067-
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
1068-
Exact(vec![
1069-
Utf8,
1070-
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
1071-
]),
1072-
Exact(vec![Utf8, Timestamp(Second, None)]),
1073-
Exact(vec![
1074-
Utf8,
1075-
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
1076-
]),
1077-
Exact(vec![Utf8, Date64]),
1078-
Exact(vec![Utf8, Date32]),
1079-
],
1080-
self.volatility(),
1081-
),
10821051
BuiltinScalarFunction::SplitPart => Signature::one_of(
10831052
vec![
10841053
Exact(vec![Utf8, Utf8, Int64]),
@@ -1369,7 +1338,6 @@ impl BuiltinScalarFunction {
13691338
BuiltinScalarFunction::MakeDate => &["make_date"],
13701339
BuiltinScalarFunction::DateBin => &["date_bin"],
13711340
BuiltinScalarFunction::DateTrunc => &["date_trunc", "datetrunc"],
1372-
BuiltinScalarFunction::DatePart => &["date_part", "datepart"],
13731341
BuiltinScalarFunction::ToChar => &["to_char", "date_format"],
13741342
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],
13751343

datafusion/expr/src/expr_fn.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,6 @@ nary_scalar_expr!(
837837
);
838838

839839
// date functions
840-
scalar_expr!(DatePart, date_part, part date, "extracts a subfield from the date");
841840
scalar_expr!(DateTrunc, date_trunc, part date, "truncates the date to a specified level of precision");
842841
scalar_expr!(DateBin, date_bin, stride source origin, "coerces an arbitrary timestamp to the start of the nearest specified interval");
843842
scalar_expr!(
@@ -1327,7 +1326,6 @@ mod test {
13271326
test_scalar_expr!(Trim, trim, string);
13281327
test_scalar_expr!(Upper, upper, string);
13291328

1330-
test_scalar_expr!(DatePart, date_part, part, date);
13311329
test_scalar_expr!(DateTrunc, date_trunc, part, date);
13321330
test_scalar_expr!(DateBin, date_bin, stride, source, origin);
13331331
test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
use arrow::compute::cast;
22+
use arrow::{
23+
array::{Array, ArrayRef, Float64Array, PrimitiveArray},
24+
datatypes::{ArrowNumericType, ArrowTemporalType, DataType},
25+
};
26+
use arrow::{compute::kernels::temporal, datatypes::TimeUnit};
27+
use datafusion_common::cast::{
28+
as_date32_array, as_date64_array, as_timestamp_microsecond_array,
29+
as_timestamp_millisecond_array, as_timestamp_nanosecond_array,
30+
as_timestamp_second_array,
31+
};
32+
use datafusion_common::{exec_err, Result, ScalarValue};
33+
use datafusion_expr::{
34+
ColumnarValue, ScalarUDFImpl, Signature, TypeSignature::Exact, Volatility,
35+
TIMEZONE_WILDCARD,
36+
};
37+
use DataType::{Date32, Date64, Timestamp, Utf8};
38+
use TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
39+
40+
fn to_ticks<T>(array: &PrimitiveArray<T>, frac: i32) -> Result<Float64Array>
41+
where
42+
T: ArrowTemporalType + ArrowNumericType,
43+
i64: From<T::Native>,
44+
{
45+
let zipped = temporal::second(array)?
46+
.values()
47+
.iter()
48+
.zip(temporal::nanosecond(array)?.values().iter())
49+
.map(|o| ((*o.0 as f64 + (*o.1 as f64) / 1_000_000_000.0) * (frac as f64)))
50+
.collect::<Vec<f64>>();
51+
52+
Ok(Float64Array::from(zipped))
53+
}
54+
55+
fn seconds<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
56+
where
57+
T: ArrowTemporalType + ArrowNumericType,
58+
i64: From<T::Native>,
59+
{
60+
to_ticks(array, 1)
61+
}
62+
63+
fn millis<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
64+
where
65+
T: ArrowTemporalType + ArrowNumericType,
66+
i64: From<T::Native>,
67+
{
68+
to_ticks(array, 1_000)
69+
}
70+
71+
fn micros<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
72+
where
73+
T: ArrowTemporalType + ArrowNumericType,
74+
i64: From<T::Native>,
75+
{
76+
to_ticks(array, 1_000_000)
77+
}
78+
79+
fn nanos<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
80+
where
81+
T: ArrowTemporalType + ArrowNumericType,
82+
i64: From<T::Native>,
83+
{
84+
to_ticks(array, 1_000_000_000)
85+
}
86+
87+
fn epoch<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
88+
where
89+
T: ArrowTemporalType + ArrowNumericType,
90+
i64: From<T::Native>,
91+
{
92+
let b = match array.data_type() {
93+
DataType::Timestamp(tu, _) => {
94+
let scale = match tu {
95+
TimeUnit::Second => 1,
96+
TimeUnit::Millisecond => 1_000,
97+
TimeUnit::Microsecond => 1_000_000,
98+
TimeUnit::Nanosecond => 1_000_000_000,
99+
} as f64;
100+
array.unary(|n| {
101+
let n: i64 = n.into();
102+
n as f64 / scale
103+
})
104+
}
105+
DataType::Date32 => {
106+
let seconds_in_a_day = 86400_f64;
107+
array.unary(|n| {
108+
let n: i64 = n.into();
109+
n as f64 * seconds_in_a_day
110+
})
111+
}
112+
DataType::Date64 => array.unary(|n| {
113+
let n: i64 = n.into();
114+
n as f64 / 1_000_f64
115+
}),
116+
_ => return exec_err!("Can not convert {:?} to epoch", array.data_type()),
117+
};
118+
Ok(b)
119+
}
120+
121+
macro_rules! extract_date_part {
122+
($ARRAY: expr, $FN:expr) => {
123+
match $ARRAY.data_type() {
124+
DataType::Date32 => {
125+
let array = as_date32_array($ARRAY)?;
126+
Ok($FN(array)
127+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
128+
}
129+
DataType::Date64 => {
130+
let array = as_date64_array($ARRAY)?;
131+
Ok($FN(array)
132+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
133+
}
134+
DataType::Timestamp(time_unit, _) => match time_unit {
135+
TimeUnit::Second => {
136+
let array = as_timestamp_second_array($ARRAY)?;
137+
Ok($FN(array)
138+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
139+
}
140+
TimeUnit::Millisecond => {
141+
let array = as_timestamp_millisecond_array($ARRAY)?;
142+
Ok($FN(array)
143+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
144+
}
145+
TimeUnit::Microsecond => {
146+
let array = as_timestamp_microsecond_array($ARRAY)?;
147+
Ok($FN(array)
148+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
149+
}
150+
TimeUnit::Nanosecond => {
151+
let array = as_timestamp_nanosecond_array($ARRAY)?;
152+
Ok($FN(array)
153+
.map(|v| cast(&(Arc::new(v) as ArrayRef), &DataType::Float64))?)
154+
}
155+
},
156+
datatype => exec_err!("Extract does not support datatype {:?}", datatype),
157+
}
158+
};
159+
}
160+
161+
#[derive(Debug)]
162+
pub struct DatePartFunc {
163+
signature: Signature,
164+
aliases: Vec<String>,
165+
}
166+
167+
impl DatePartFunc {
168+
pub fn new() -> Self {
169+
Self {
170+
signature: Signature::one_of(
171+
vec![
172+
Exact(vec![Utf8, Timestamp(Nanosecond, None)]),
173+
Exact(vec![
174+
Utf8,
175+
Timestamp(Nanosecond, Some(TIMEZONE_WILDCARD.into())),
176+
]),
177+
Exact(vec![Utf8, Timestamp(Millisecond, None)]),
178+
Exact(vec![
179+
Utf8,
180+
Timestamp(Millisecond, Some(TIMEZONE_WILDCARD.into())),
181+
]),
182+
Exact(vec![Utf8, Timestamp(Microsecond, None)]),
183+
Exact(vec![
184+
Utf8,
185+
Timestamp(Microsecond, Some(TIMEZONE_WILDCARD.into())),
186+
]),
187+
Exact(vec![Utf8, Timestamp(Second, None)]),
188+
Exact(vec![
189+
Utf8,
190+
Timestamp(Second, Some(TIMEZONE_WILDCARD.into())),
191+
]),
192+
Exact(vec![Utf8, Date64]),
193+
Exact(vec![Utf8, Date32]),
194+
],
195+
Volatility::Immutable,
196+
),
197+
aliases: vec![String::from("date_part"), String::from("datepart")],
198+
}
199+
}
200+
}
201+
202+
impl ScalarUDFImpl for DatePartFunc {
203+
fn as_any(&self) -> &dyn Any {
204+
self
205+
}
206+
207+
fn name(&self) -> &str {
208+
"date_part"
209+
}
210+
211+
fn aliases(&self) -> &[String] {
212+
&self.aliases
213+
}
214+
215+
fn signature(&self) -> &Signature {
216+
&self.signature
217+
}
218+
219+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
220+
Ok(DataType::Float64)
221+
}
222+
223+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
224+
if args.len() != 2 {
225+
return exec_err!("Expected two arguments in DATE_PART");
226+
}
227+
let (date_part, array) = (&args[0], &args[1]);
228+
229+
let date_part =
230+
if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = date_part {
231+
v
232+
} else {
233+
return exec_err!(
234+
"First argument of `DATE_PART` must be non-null scalar Utf8"
235+
);
236+
};
237+
238+
let is_scalar = matches!(array, ColumnarValue::Scalar(_));
239+
240+
let array = match array {
241+
ColumnarValue::Array(array) => array.clone(),
242+
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
243+
};
244+
245+
let arr = match date_part.to_lowercase().as_str() {
246+
"year" => extract_date_part!(&array, temporal::year),
247+
"quarter" => extract_date_part!(&array, temporal::quarter),
248+
"month" => extract_date_part!(&array, temporal::month),
249+
"week" => extract_date_part!(&array, temporal::week),
250+
"day" => extract_date_part!(&array, temporal::day),
251+
"doy" => extract_date_part!(&array, temporal::doy),
252+
"dow" => extract_date_part!(&array, temporal::num_days_from_sunday),
253+
"hour" => extract_date_part!(&array, temporal::hour),
254+
"minute" => extract_date_part!(&array, temporal::minute),
255+
"second" => extract_date_part!(&array, seconds),
256+
"millisecond" => extract_date_part!(&array, millis),
257+
"microsecond" => extract_date_part!(&array, micros),
258+
"nanosecond" => extract_date_part!(&array, nanos),
259+
"epoch" => extract_date_part!(&array, epoch),
260+
_ => exec_err!("Date part '{date_part}' not supported"),
261+
}?;
262+
263+
Ok(if is_scalar {
264+
ColumnarValue::Scalar(ScalarValue::try_from_array(&arr?, 0)?)
265+
} else {
266+
ColumnarValue::Array(arr?)
267+
})
268+
}
269+
}

datafusion/functions/src/datetime/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ use std::sync::Arc;
2222
use datafusion_expr::ScalarUDF;
2323

2424
mod common;
25+
pub mod date_part;
2526
mod to_date;
2627
mod to_timestamp;
2728
mod to_unixtime;
2829

2930
// create UDFs
3031
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
32+
make_udf_function!(date_part::DatePartFunc, DATE_PART, date_part);
3133
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
3234
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
3335
make_udf_function!(
@@ -107,6 +109,10 @@ pub mod expr_fn {
107109
super::to_date().call(args)
108110
}
109111

112+
pub fn date_part(args: Vec<Expr>) -> Expr {
113+
super::date_part().call(args)
114+
}
115+
110116
#[doc = "converts a string and optional formats to a Unixtime"]
111117
pub fn to_unixtime(args: Vec<Expr>) -> Expr {
112118
super::to_unixtime().call(args)
@@ -142,6 +148,7 @@ pub mod expr_fn {
142148
pub fn functions() -> Vec<Arc<ScalarUDF>> {
143149
vec![
144150
to_date(),
151+
date_part(),
145152
to_unixtime(),
146153
to_timestamp(),
147154
to_timestamp_seconds(),

0 commit comments

Comments
 (0)