Skip to content

Commit 91f3eb6

Browse files
committed
[task #5568] add_to_unixtime_function
Signed-off-by: tangruilin <[email protected]>
1 parent 2a490e4 commit 91f3eb6

File tree

6 files changed

+176
-1
lines changed

6 files changed

+176
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::Arc;
19+
20+
use datafusion::arrow::array::StringArray;
21+
use datafusion::arrow::datatypes::{DataType, Field, Schema};
22+
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::error::Result;
24+
use datafusion::prelude::*;
25+
26+
/// This example demonstrates how to use the to_date series
27+
/// of functions in the DataFrame API as well as via sql.
28+
#[tokio::main]
29+
async fn main() -> Result<()> {
30+
// define a schema.
31+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
32+
33+
// define data.
34+
let batch = RecordBatch::try_new(
35+
schema,
36+
vec![Arc::new(StringArray::from(vec![
37+
"2020-09-08T13:42:29Z",
38+
"2020-09-08T13:42:29.190855-05:00",
39+
"2020-08-09 12:13:29",
40+
"2020-01-02",
41+
]))],
42+
)?;
43+
44+
// declare a new context. In spark API, this corresponds to a new spark SQLsession
45+
let ctx = SessionContext::new();
46+
47+
// declare a table in memory. In spark API, this corresponds to createDataFrame(...).
48+
ctx.register_batch("t", batch)?;
49+
let df = ctx.table("t").await?;
50+
51+
// use to_date function to convert col 'a' to timestamp type using the default parsing
52+
let df = df.with_column("a", to_unixtime(vec![col("a")]))?;
53+
54+
let df = df.select_columns(&["a"])?;
55+
56+
// print the results
57+
df.show().await?;
58+
59+
Ok(())
60+
}

datafusion/functions/src/datetime/mod.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ use datafusion_expr::ScalarUDF;
2424
mod common;
2525
mod to_date;
2626
mod to_timestamp;
27+
mod to_unixtime;
2728

2829
// create UDFs
2930
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
31+
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
3032
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
3133
make_udf_function!(
3234
to_timestamp::ToTimestampSecondsFunc,
@@ -68,7 +70,7 @@ pub mod expr_fn {
6870
/// # use datafusion_expr::col;
6971
/// # use datafusion::prelude::*;
7072
/// # use datafusion_functions::expr_fn::to_date;
71-
///
73+
///
7274
/// // define a schema.
7375
/// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, false)]));
7476
///
@@ -105,6 +107,11 @@ pub mod expr_fn {
105107
super::to_date().call(args)
106108
}
107109

110+
#[doc = "converts a string and optional formats to a Unixtime"]
111+
pub fn to_unixtime(args: Vec<Expr>) -> Expr {
112+
super::to_unixtime().call(args)
113+
}
114+
108115
#[doc = "converts a string and optional formats to a `Timestamp(Nanoseconds, None)`"]
109116
pub fn to_timestamp(args: Vec<Expr>) -> Expr {
110117
super::to_timestamp().call(args)
@@ -135,6 +142,7 @@ pub mod expr_fn {
135142
pub fn functions() -> Vec<Arc<ScalarUDF>> {
136143
vec![
137144
to_date(),
145+
to_unixtime(),
138146
to_timestamp(),
139147
to_timestamp_seconds(),
140148
to_timestamp_millis(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
20+
use arrow::datatypes::DataType;
21+
22+
use crate::datetime::common::*;
23+
use datafusion_common::{exec_err, Result};
24+
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
25+
26+
use super::to_timestamp::ToTimestampSecondsFunc;
27+
28+
#[derive(Debug)]
29+
pub(super) struct ToUnixtimeFunc {
30+
signature: Signature,
31+
}
32+
33+
impl ToUnixtimeFunc {
34+
pub fn new() -> Self {
35+
Self {
36+
signature: Signature::variadic_any(Volatility::Immutable),
37+
}
38+
}
39+
}
40+
41+
impl ScalarUDFImpl for ToUnixtimeFunc {
42+
fn as_any(&self) -> &dyn Any {
43+
self
44+
}
45+
46+
fn name(&self) -> &str {
47+
"to_unixtime"
48+
}
49+
50+
fn signature(&self) -> &Signature {
51+
&self.signature
52+
}
53+
54+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
55+
Ok(DataType::Int64)
56+
}
57+
58+
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
59+
if args.is_empty() {
60+
return exec_err!("to_date function requires 1 or more arguments, got 0");
61+
}
62+
63+
// validate that any args after the first one are Utf8
64+
if args.len() > 1 {
65+
if let Some(value) = validate_data_types(args, "to_unixtime") {
66+
return value;
67+
}
68+
}
69+
70+
match args[0].data_type() {
71+
DataType::Int32
72+
| DataType::Int64
73+
| DataType::Null
74+
| DataType::Float64
75+
| DataType::Date32
76+
| DataType::Date64 => args[0].cast_to(&DataType::Int64, None),
77+
DataType::Utf8 => ToTimestampSecondsFunc::new()
78+
.invoke(args)?
79+
.cast_to(&DataType::Int64, None),
80+
other => {
81+
exec_err!("Unsupported data type {:?} for function to_unixtime", other)
82+
}
83+
}
84+
}
85+
}

datafusion/proto/proto/datafusion.proto

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/*
22
3+
34
* Licensed to the Apache Software Foundation (ASF) under one
45
* or more contributor license agreements. See the NOTICE file
56
* distributed with this work for additional information
@@ -684,6 +685,7 @@ enum ScalarFunction {
684685
RegexpLike = 135;
685686
ToChar = 136;
686687
/// 137 was ToDate
688+
/// 138 was ToUnixtime
687689
}
688690

689691
message ScalarFunctionNode {

datafusion/proto/src/generated/prost.rs

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/sqllogictest/test_files/timestamps.slt

+19
Original file line numberDiff line numberDiff line change
@@ -2678,3 +2678,22 @@ SELECT to_char(null, '%d-%m-%Y');
26782678

26792679
statement ok
26802680
drop table formats;
2681+
2682+
##########
2683+
## to_unixtime tests
2684+
##########
2685+
2686+
query I
2687+
select to_unixtime('2020-09-08T12:00:00+00:00');
2688+
----
2689+
1599566400
2690+
2691+
query I
2692+
select to_unixtime('01-14-2023 01:01:30+05:30', '%q', '%d-%m-%Y %H/%M/%S', '%+', '%m-%d-%Y %H:%M:%S%#z');
2693+
----
2694+
1673638290
2695+
2696+
query I
2697+
select to_unixtime('03:59:00.123456789 05-17-2023', '%c', '%+', '%H:%M:%S%.f %m-%d-%Y');
2698+
----
2699+
1684295940

0 commit comments

Comments
 (0)