@@ -16,13 +16,16 @@ use std::fmt;
16
16
use std:: str:: FromStr ;
17
17
use std:: sync:: Arc ;
18
18
19
- use common_query:: error:: { self , Result , UnsupportedInputDataTypeSnafu } ;
19
+ use common_query:: error:: { InvalidFuncArgsSnafu , Result , UnsupportedInputDataTypeSnafu } ;
20
20
use common_query:: prelude:: { Signature , Volatility } ;
21
21
use common_time:: timestamp:: TimeUnit ;
22
22
use common_time:: Timestamp ;
23
23
use datatypes:: prelude:: ConcreteDataType ;
24
- use datatypes:: types:: StringType ;
25
- use datatypes:: vectors:: { Int64Vector , StringVector , Vector , VectorRef } ;
24
+ use datatypes:: types:: TimestampType ;
25
+ use datatypes:: vectors:: {
26
+ Int64Vector , StringVector , TimestampMicrosecondVector , TimestampMillisecondVector ,
27
+ TimestampNanosecondVector , TimestampSecondVector , Vector , VectorRef ,
28
+ } ;
26
29
use snafu:: ensure;
27
30
28
31
use crate :: scalars:: function:: { Function , FunctionContext } ;
@@ -42,26 +45,41 @@ fn convert_to_seconds(arg: &str) -> Option<i64> {
42
45
}
43
46
}
44
47
48
+ fn process_vector ( vector : & dyn Vector ) -> Vec < Option < i64 > > {
49
+ ( 0 ..vector. len ( ) )
50
+ . map ( |i| paste:: expr!( ( vector. get( i) ) . as_timestamp( ) . map( |ts| ts. value( ) ) ) )
51
+ . collect :: < Vec < Option < i64 > > > ( )
52
+ }
53
+
45
54
impl Function for ToUnixtimeFunction {
46
55
fn name ( & self ) -> & str {
47
56
NAME
48
57
}
49
58
50
59
fn return_type ( & self , _input_types : & [ ConcreteDataType ] ) -> Result < ConcreteDataType > {
51
- Ok ( ConcreteDataType :: timestamp_second_datatype ( ) )
60
+ Ok ( ConcreteDataType :: int64_datatype ( ) )
52
61
}
53
62
54
63
fn signature ( & self ) -> Signature {
55
- Signature :: exact (
56
- vec ! [ ConcreteDataType :: String ( StringType ) ] ,
64
+ Signature :: uniform (
65
+ 1 ,
66
+ vec ! [
67
+ ConcreteDataType :: string_datatype( ) ,
68
+ ConcreteDataType :: int32_datatype( ) ,
69
+ ConcreteDataType :: int64_datatype( ) ,
70
+ ConcreteDataType :: timestamp_second_datatype( ) ,
71
+ ConcreteDataType :: timestamp_millisecond_datatype( ) ,
72
+ ConcreteDataType :: timestamp_microsecond_datatype( ) ,
73
+ ConcreteDataType :: timestamp_nanosecond_datatype( ) ,
74
+ ] ,
57
75
Volatility :: Immutable ,
58
76
)
59
77
}
60
78
61
79
fn eval ( & self , _func_ctx : FunctionContext , columns : & [ VectorRef ] ) -> Result < VectorRef > {
62
80
ensure ! (
63
81
columns. len( ) == 1 ,
64
- error :: InvalidFuncArgsSnafu {
82
+ InvalidFuncArgsSnafu {
65
83
err_msg: format!(
66
84
"The length of the args is not correct, expect exactly one, have: {}" ,
67
85
columns. len( )
@@ -79,6 +97,42 @@ impl Function for ToUnixtimeFunction {
79
97
. collect :: < Vec < _ > > ( ) ,
80
98
) ) )
81
99
}
100
+ ConcreteDataType :: Int64 ( _) | ConcreteDataType :: Int32 ( _) => {
101
+ let array = columns[ 0 ] . to_arrow_array ( ) ;
102
+ Ok ( Arc :: new ( Int64Vector :: try_from_arrow_array ( & array) . unwrap ( ) ) )
103
+ }
104
+ ConcreteDataType :: Timestamp ( ts) => {
105
+ let array = columns[ 0 ] . to_arrow_array ( ) ;
106
+ let value = match ts {
107
+ TimestampType :: Second ( _) => {
108
+ let vector = paste:: expr!( TimestampSecondVector :: try_from_arrow_array(
109
+ array
110
+ )
111
+ . unwrap( ) ) ;
112
+ process_vector ( & vector)
113
+ }
114
+ TimestampType :: Millisecond ( _) => {
115
+ let vector = paste:: expr!(
116
+ TimestampMillisecondVector :: try_from_arrow_array( array) . unwrap( )
117
+ ) ;
118
+ process_vector ( & vector)
119
+ }
120
+ TimestampType :: Microsecond ( _) => {
121
+ let vector = paste:: expr!(
122
+ TimestampMicrosecondVector :: try_from_arrow_array( array) . unwrap( )
123
+ ) ;
124
+ process_vector ( & vector)
125
+ }
126
+ TimestampType :: Nanosecond ( _) => {
127
+ let vector = paste:: expr!( TimestampNanosecondVector :: try_from_arrow_array(
128
+ array
129
+ )
130
+ . unwrap( ) ) ;
131
+ process_vector ( & vector)
132
+ }
133
+ } ;
134
+ Ok ( Arc :: new ( Int64Vector :: from ( value) ) )
135
+ }
82
136
_ => UnsupportedInputDataTypeSnafu {
83
137
function : NAME ,
84
138
datatypes : columns. iter ( ) . map ( |c| c. data_type ( ) ) . collect :: < Vec < _ > > ( ) ,
@@ -97,28 +151,37 @@ impl fmt::Display for ToUnixtimeFunction {
97
151
#[ cfg( test) ]
98
152
mod tests {
99
153
use common_query:: prelude:: TypeSignature ;
100
- use datatypes:: prelude:: ConcreteDataType ;
101
- use datatypes:: types:: StringType ;
154
+ use datatypes:: prelude:: { ConcreteDataType , ScalarVectorBuilder } ;
155
+ use datatypes:: scalars:: ScalarVector ;
156
+ use datatypes:: timestamp:: TimestampSecond ;
102
157
use datatypes:: value:: Value ;
103
- use datatypes:: vectors:: StringVector ;
158
+ use datatypes:: vectors:: { StringVector , TimestampSecondVector } ;
104
159
105
160
use super :: { ToUnixtimeFunction , * } ;
106
161
use crate :: scalars:: Function ;
107
162
108
163
#[ test]
109
- fn test_to_unixtime ( ) {
164
+ fn test_string_to_unixtime ( ) {
110
165
let f = ToUnixtimeFunction :: default ( ) ;
111
166
assert_eq ! ( "to_unixtime" , f. name( ) ) ;
112
167
assert_eq ! (
113
- ConcreteDataType :: timestamp_second_datatype ( ) ,
168
+ ConcreteDataType :: int64_datatype ( ) ,
114
169
f. return_type( & [ ] ) . unwrap( )
115
170
) ;
116
171
117
172
assert ! ( matches!( f. signature( ) ,
118
- Signature {
119
- type_signature: TypeSignature :: Exact ( valid_types) ,
120
- volatility: Volatility :: Immutable
121
- } if valid_types == vec![ ConcreteDataType :: String ( StringType ) ]
173
+ Signature {
174
+ type_signature: TypeSignature :: Uniform ( 1 , valid_types) ,
175
+ volatility: Volatility :: Immutable
176
+ } if valid_types == vec![
177
+ ConcreteDataType :: string_datatype( ) ,
178
+ ConcreteDataType :: int32_datatype( ) ,
179
+ ConcreteDataType :: int64_datatype( ) ,
180
+ ConcreteDataType :: timestamp_second_datatype( ) ,
181
+ ConcreteDataType :: timestamp_millisecond_datatype( ) ,
182
+ ConcreteDataType :: timestamp_microsecond_datatype( ) ,
183
+ ConcreteDataType :: timestamp_nanosecond_datatype( ) ,
184
+ ]
122
185
) ) ;
123
186
124
187
let times = vec ! [
@@ -145,4 +208,106 @@ mod tests {
145
208
}
146
209
}
147
210
}
211
+
212
+ #[ test]
213
+ fn test_int_to_unixtime ( ) {
214
+ let f = ToUnixtimeFunction :: default ( ) ;
215
+ assert_eq ! ( "to_unixtime" , f. name( ) ) ;
216
+ assert_eq ! (
217
+ ConcreteDataType :: int64_datatype( ) ,
218
+ f. return_type( & [ ] ) . unwrap( )
219
+ ) ;
220
+
221
+ assert ! ( matches!( f. signature( ) ,
222
+ Signature {
223
+ type_signature: TypeSignature :: Uniform ( 1 , valid_types) ,
224
+ volatility: Volatility :: Immutable
225
+ } if valid_types == vec![
226
+ ConcreteDataType :: string_datatype( ) ,
227
+ ConcreteDataType :: int32_datatype( ) ,
228
+ ConcreteDataType :: int64_datatype( ) ,
229
+ ConcreteDataType :: timestamp_second_datatype( ) ,
230
+ ConcreteDataType :: timestamp_millisecond_datatype( ) ,
231
+ ConcreteDataType :: timestamp_microsecond_datatype( ) ,
232
+ ConcreteDataType :: timestamp_nanosecond_datatype( ) ,
233
+ ]
234
+ ) ) ;
235
+
236
+ let times = vec ! [ Some ( 3_i64 ) , None , Some ( 5_i64 ) , None ] ;
237
+ let results = vec ! [ Some ( 3 ) , None , Some ( 5 ) , None ] ;
238
+ let args: Vec < VectorRef > = vec ! [ Arc :: new( Int64Vector :: from( times. clone( ) ) ) ] ;
239
+ let vector = f. eval ( FunctionContext :: default ( ) , & args) . unwrap ( ) ;
240
+ assert_eq ! ( 4 , vector. len( ) ) ;
241
+ for ( i, _t) in times. iter ( ) . enumerate ( ) {
242
+ let v = vector. get ( i) ;
243
+ if i == 1 || i == 3 {
244
+ assert_eq ! ( Value :: Null , v) ;
245
+ continue ;
246
+ }
247
+ match v {
248
+ Value :: Int64 ( ts) => {
249
+ assert_eq ! ( ts, ( * results. get( i) . unwrap( ) ) . unwrap( ) ) ;
250
+ }
251
+ _ => unreachable ! ( ) ,
252
+ }
253
+ }
254
+ }
255
+
256
+ #[ test]
257
+ fn test_timestamp_to_unixtime ( ) {
258
+ let f = ToUnixtimeFunction :: default ( ) ;
259
+ assert_eq ! ( "to_unixtime" , f. name( ) ) ;
260
+ assert_eq ! (
261
+ ConcreteDataType :: int64_datatype( ) ,
262
+ f. return_type( & [ ] ) . unwrap( )
263
+ ) ;
264
+
265
+ assert ! ( matches!( f. signature( ) ,
266
+ Signature {
267
+ type_signature: TypeSignature :: Uniform ( 1 , valid_types) ,
268
+ volatility: Volatility :: Immutable
269
+ } if valid_types == vec![
270
+ ConcreteDataType :: string_datatype( ) ,
271
+ ConcreteDataType :: int32_datatype( ) ,
272
+ ConcreteDataType :: int64_datatype( ) ,
273
+ ConcreteDataType :: timestamp_second_datatype( ) ,
274
+ ConcreteDataType :: timestamp_millisecond_datatype( ) ,
275
+ ConcreteDataType :: timestamp_microsecond_datatype( ) ,
276
+ ConcreteDataType :: timestamp_nanosecond_datatype( ) ,
277
+ ]
278
+ ) ) ;
279
+
280
+ let times: Vec < Option < TimestampSecond > > = vec ! [
281
+ Some ( TimestampSecond :: new( 123 ) ) ,
282
+ None ,
283
+ Some ( TimestampSecond :: new( 42 ) ) ,
284
+ None ,
285
+ ] ;
286
+ let results = vec ! [ Some ( 123 ) , None , Some ( 42 ) , None ] ;
287
+ let ts_vector: TimestampSecondVector = build_vector_from_slice ( & times) ;
288
+ let args: Vec < VectorRef > = vec ! [ Arc :: new( ts_vector) ] ;
289
+ let vector = f. eval ( FunctionContext :: default ( ) , & args) . unwrap ( ) ;
290
+ assert_eq ! ( 4 , vector. len( ) ) ;
291
+ for ( i, _t) in times. iter ( ) . enumerate ( ) {
292
+ let v = vector. get ( i) ;
293
+ if i == 1 || i == 3 {
294
+ assert_eq ! ( Value :: Null , v) ;
295
+ continue ;
296
+ }
297
+ match v {
298
+ Value :: Int64 ( ts) => {
299
+ assert_eq ! ( ts, ( * results. get( i) . unwrap( ) ) . unwrap( ) ) ;
300
+ }
301
+ _ => unreachable ! ( ) ,
302
+ }
303
+ }
304
+ }
305
+
306
+ fn build_vector_from_slice < T : ScalarVector > ( items : & [ Option < T :: RefItem < ' _ > > ] ) -> T {
307
+ let mut builder = T :: Builder :: with_capacity ( items. len ( ) ) ;
308
+ for item in items {
309
+ builder. push ( * item) ;
310
+ }
311
+ builder. finish ( )
312
+ }
148
313
}
0 commit comments