@@ -32,6 +32,7 @@ use arrow::{
32
32
TimestampMillisecondType , TimestampNanosecondType , TimestampSecondType ,
33
33
} ,
34
34
} ;
35
+ use arrow_array:: timezone:: Tz ;
35
36
use arrow_array:: {
36
37
TimestampMicrosecondArray , TimestampMillisecondArray , TimestampSecondArray ,
37
38
} ;
@@ -218,13 +219,28 @@ fn quarter_month(date: &NaiveDateTime) -> u32 {
218
219
/// epoch, for granularities greater than 1 second, in taking into
219
220
/// account that some granularities are not uniform durations of time
220
221
/// (e.g. months are not always the same lengths, leap seconds, etc)
221
- fn date_trunc_coarse ( granularity : & str , value : i64 ) -> Result < i64 > {
222
+ fn date_trunc_coarse ( granularity : & str , value : i64 , tz : Option < Arc < str > > ) -> Result < i64 > {
222
223
// Use chrono NaiveDateTime to clear the various fields
223
224
// correctly accounting for non uniform granularities
224
225
let value = timestamp_ns_to_datetime ( value) . ok_or_else ( || {
225
226
DataFusionError :: Execution ( format ! ( "Timestamp {value} out of range" ) )
226
227
} ) ?;
227
228
229
+ // convert to local time without time zone
230
+ let value = match tz {
231
+ Some ( tz) => {
232
+ let tz: Tz = tz. parse ( ) ?;
233
+ tz. offset_from_local_datetime ( & value)
234
+ . map ( |offset| DateTime :: < Tz > :: from_utc ( value + offset. fix ( ) , offset) )
235
+ . single ( )
236
+ . ok_or_else ( || {
237
+ DataFusionError :: Execution ( format ! ( "Timestamp {value} out of range" ) )
238
+ } ) ?
239
+ . naive_utc ( )
240
+ }
241
+ None => value,
242
+ } ;
243
+
228
244
let value = Some ( value) ;
229
245
230
246
let value = match granularity {
@@ -282,6 +298,7 @@ fn date_trunc_coarse(granularity: &str, value: i64) -> Result<i64> {
282
298
// truncates a single value with the given timeunit to the specified granularity
283
299
fn _date_trunc (
284
300
tu : TimeUnit ,
301
+ tz : Option < Arc < str > > ,
285
302
value : & Option < i64 > ,
286
303
granularity : & str ,
287
304
) -> Result < Option < i64 > , DataFusionError > {
@@ -297,33 +314,105 @@ fn _date_trunc(
297
314
} ;
298
315
299
316
// convert to nanoseconds
300
- let nano = date_trunc_coarse ( granularity, scale * value) ?;
317
+ let nano = date_trunc_coarse ( granularity, scale * value, tz . clone ( ) ) ?;
301
318
302
319
let result = match tu {
303
320
TimeUnit :: Second => match granularity {
304
- "minute" => Some ( nano / 1_000_000_000 / 60 * 60 ) ,
305
- _ => Some ( nano / 1_000_000_000 ) ,
321
+ "minute" => nano / 1_000_000_000 / 60 * 60 ,
322
+ _ => nano / 1_000_000_000 ,
306
323
} ,
307
324
TimeUnit :: Millisecond => match granularity {
308
- "minute" => Some ( nano / 1_000_000 / 1_000 / 60 * 1_000 * 60 ) ,
309
- "second" => Some ( nano / 1_000_000 / 1_000 * 1_000 ) ,
310
- _ => Some ( nano / 1_000_000 ) ,
325
+ "minute" => nano / 1_000_000 / 1_000 / 60 * 1_000 * 60 ,
326
+ "second" => nano / 1_000_000 / 1_000 * 1_000 ,
327
+ _ => nano / 1_000_000 ,
311
328
} ,
312
329
TimeUnit :: Microsecond => match granularity {
313
- "minute" => Some ( nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000 ) ,
314
- "second" => Some ( nano / 1_000 / 1_000_000 * 1_000_000 ) ,
315
- "millisecond" => Some ( nano / 1_000 / 1_000 * 1_000 ) ,
316
- _ => Some ( nano / 1_000 ) ,
330
+ "minute" => nano / 1_000 / 1_000_000 / 60 * 60 * 1_000_000 ,
331
+ "second" => nano / 1_000 / 1_000_000 * 1_000_000 ,
332
+ "millisecond" => nano / 1_000 / 1_000 * 1_000 ,
333
+ _ => nano / 1_000 ,
317
334
} ,
318
335
_ => match granularity {
319
- "minute" => Some ( nano / 1_000_000_000 / 60 * 1_000_000_000 * 60 ) ,
320
- "second" => Some ( nano / 1_000_000_000 * 1_000_000_000 ) ,
321
- "millisecond" => Some ( nano / 1_000_000 * 1_000_000 ) ,
322
- "microsecond" => Some ( nano / 1_000 * 1_000 ) ,
323
- _ => Some ( nano) ,
336
+ "minute" => nano / 1_000_000_000 / 60 * 1_000_000_000 * 60 ,
337
+ "second" => nano / 1_000_000_000 * 1_000_000_000 ,
338
+ "millisecond" => nano / 1_000_000 * 1_000_000 ,
339
+ "microsecond" => nano / 1_000 * 1_000 ,
340
+ _ => nano,
324
341
} ,
325
342
} ;
326
- Ok ( result)
343
+
344
+ let result = match tz {
345
+ Some ( tz) => {
346
+ let tz: Tz = tz. parse ( ) ?;
347
+ match tu {
348
+ TimeUnit :: Second => {
349
+ let value = tz. from_local_datetime (
350
+ & NaiveDateTime :: from_timestamp_opt ( result, 0 ) . unwrap ( ) ,
351
+ ) ;
352
+ let value = value
353
+ . single ( )
354
+ . ok_or_else ( || {
355
+ DataFusionError :: Execution ( "Invalid timestamp" . to_string ( ) )
356
+ } ) ?
357
+ . naive_utc ( ) ;
358
+ value. timestamp_nanos ( ) / 1_000_000_000
359
+ }
360
+
361
+ TimeUnit :: Millisecond => {
362
+ let value = tz. from_local_datetime (
363
+ & NaiveDateTime :: from_timestamp_opt (
364
+ result / 1_000 ,
365
+ ( result % 1_000 * 1_000_000 ) as u32 ,
366
+ )
367
+ . unwrap ( ) ,
368
+ ) ;
369
+ let value = value
370
+ . single ( )
371
+ . ok_or_else ( || {
372
+ DataFusionError :: Execution ( "Invalid timestamp" . to_string ( ) )
373
+ } ) ?
374
+ . naive_utc ( ) ;
375
+ value. timestamp_nanos ( ) / 1_000_000
376
+ }
377
+
378
+ TimeUnit :: Microsecond => {
379
+ let value = tz. from_local_datetime (
380
+ & NaiveDateTime :: from_timestamp_opt (
381
+ result / 1_000_000 ,
382
+ ( result % 1_000_000 * 1_000 ) as u32 ,
383
+ )
384
+ . unwrap ( ) ,
385
+ ) ;
386
+ let value = value
387
+ . single ( )
388
+ . ok_or_else ( || {
389
+ DataFusionError :: Execution ( "Invalid timestamp" . to_string ( ) )
390
+ } ) ?
391
+ . naive_utc ( ) ;
392
+ value. timestamp_nanos ( ) / 1_000
393
+ }
394
+ _ => {
395
+ let value = tz. from_local_datetime (
396
+ & NaiveDateTime :: from_timestamp_opt (
397
+ result / 1_000_000_000 ,
398
+ ( result % 1_000_000_000 ) as u32 ,
399
+ )
400
+ . unwrap ( ) ,
401
+ ) ;
402
+ let value = value
403
+ . single ( )
404
+ . ok_or_else ( || {
405
+ DataFusionError :: Execution ( "Invalid timestamp" . to_string ( ) )
406
+ } ) ?
407
+ . naive_utc ( ) ;
408
+ value. timestamp_nanos ( )
409
+ }
410
+ }
411
+ }
412
+ None => result,
413
+ } ;
414
+
415
+ Ok ( Some ( result) )
327
416
}
328
417
329
418
/// date_trunc SQL function
@@ -341,62 +430,117 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
341
430
342
431
Ok ( match array {
343
432
ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond ( v, tz_opt) ) => {
344
- let value = _date_trunc ( TimeUnit :: Nanosecond , v, granularity. as_str ( ) ) ?;
433
+ let value = _date_trunc (
434
+ TimeUnit :: Nanosecond ,
435
+ tz_opt. clone ( ) ,
436
+ v,
437
+ granularity. as_str ( ) ,
438
+ ) ?;
439
+
345
440
let value = ScalarValue :: TimestampNanosecond ( value, tz_opt. clone ( ) ) ;
346
441
ColumnarValue :: Scalar ( value)
347
442
}
348
443
ColumnarValue :: Scalar ( ScalarValue :: TimestampMicrosecond ( v, tz_opt) ) => {
349
- let value = _date_trunc ( TimeUnit :: Microsecond , v, granularity. as_str ( ) ) ?;
444
+ let value = _date_trunc (
445
+ TimeUnit :: Microsecond ,
446
+ tz_opt. clone ( ) ,
447
+ v,
448
+ granularity. as_str ( ) ,
449
+ ) ?;
350
450
let value = ScalarValue :: TimestampMicrosecond ( value, tz_opt. clone ( ) ) ;
351
451
ColumnarValue :: Scalar ( value)
352
452
}
353
453
ColumnarValue :: Scalar ( ScalarValue :: TimestampMillisecond ( v, tz_opt) ) => {
354
- let value = _date_trunc ( TimeUnit :: Millisecond , v, granularity. as_str ( ) ) ?;
454
+ let value = _date_trunc (
455
+ TimeUnit :: Millisecond ,
456
+ tz_opt. clone ( ) ,
457
+ v,
458
+ granularity. as_str ( ) ,
459
+ ) ?;
355
460
let value = ScalarValue :: TimestampMillisecond ( value, tz_opt. clone ( ) ) ;
356
461
ColumnarValue :: Scalar ( value)
357
462
}
358
463
ColumnarValue :: Scalar ( ScalarValue :: TimestampSecond ( v, tz_opt) ) => {
359
- let value = _date_trunc ( TimeUnit :: Second , v, granularity. as_str ( ) ) ?;
464
+ let value =
465
+ _date_trunc ( TimeUnit :: Second , tz_opt. clone ( ) , v, granularity. as_str ( ) ) ?;
360
466
let value = ScalarValue :: TimestampSecond ( value, tz_opt. clone ( ) ) ;
361
467
ColumnarValue :: Scalar ( value)
362
468
}
363
469
ColumnarValue :: Array ( array) => {
364
470
let array_type = array. data_type ( ) ;
365
471
match array_type {
366
- DataType :: Timestamp ( TimeUnit :: Second , _ ) => {
472
+ DataType :: Timestamp ( TimeUnit :: Second , tz_opt ) => {
367
473
let array = as_timestamp_second_array ( array) ?;
368
474
let array = array
369
475
. iter ( )
370
- . map ( |x| _date_trunc ( TimeUnit :: Second , & x, granularity. as_str ( ) ) )
476
+ . map ( |x| {
477
+ _date_trunc (
478
+ TimeUnit :: Second ,
479
+ tz_opt. clone ( ) ,
480
+ & x,
481
+ granularity. as_str ( ) ,
482
+ )
483
+ } )
371
484
. collect :: < Result < TimestampSecondArray > > ( ) ?;
372
485
ColumnarValue :: Array ( Arc :: new ( array) )
373
486
}
374
- DataType :: Timestamp ( TimeUnit :: Millisecond , _ ) => {
487
+ DataType :: Timestamp ( TimeUnit :: Millisecond , tz_opt ) => {
375
488
let array = as_timestamp_millisecond_array ( array) ?;
376
489
let array = array
377
490
. iter ( )
378
491
. map ( |x| {
379
- _date_trunc ( TimeUnit :: Millisecond , & x, granularity. as_str ( ) )
492
+ _date_trunc (
493
+ TimeUnit :: Millisecond ,
494
+ tz_opt. clone ( ) ,
495
+ & x,
496
+ granularity. as_str ( ) ,
497
+ )
380
498
} )
381
499
. collect :: < Result < TimestampMillisecondArray > > ( ) ?;
382
500
ColumnarValue :: Array ( Arc :: new ( array) )
383
501
}
384
- DataType :: Timestamp ( TimeUnit :: Microsecond , _ ) => {
502
+ DataType :: Timestamp ( TimeUnit :: Microsecond , tz_opt ) => {
385
503
let array = as_timestamp_microsecond_array ( array) ?;
386
504
let array = array
387
505
. iter ( )
388
506
. map ( |x| {
389
- _date_trunc ( TimeUnit :: Microsecond , & x, granularity. as_str ( ) )
507
+ _date_trunc (
508
+ TimeUnit :: Microsecond ,
509
+ tz_opt. clone ( ) ,
510
+ & x,
511
+ granularity. as_str ( ) ,
512
+ )
390
513
} )
391
514
. collect :: < Result < TimestampMicrosecondArray > > ( ) ?;
392
515
ColumnarValue :: Array ( Arc :: new ( array) )
393
516
}
517
+ DataType :: Timestamp ( TimeUnit :: Nanosecond , tz_opt) => {
518
+ let array = as_timestamp_nanosecond_array ( array) ?;
519
+ let array = array
520
+ . iter ( )
521
+ . map ( |x| {
522
+ _date_trunc (
523
+ TimeUnit :: Nanosecond ,
524
+ tz_opt. clone ( ) ,
525
+ & x,
526
+ granularity. as_str ( ) ,
527
+ )
528
+ } )
529
+ . collect :: < Result < TimestampNanosecondArray > > ( ) ?;
530
+
531
+ ColumnarValue :: Array ( Arc :: new ( array) )
532
+ }
394
533
_ => {
395
534
let array = as_timestamp_nanosecond_array ( array) ?;
396
535
let array = array
397
536
. iter ( )
398
537
. map ( |x| {
399
- _date_trunc ( TimeUnit :: Nanosecond , & x, granularity. as_str ( ) )
538
+ _date_trunc (
539
+ TimeUnit :: Nanosecond ,
540
+ None ,
541
+ & x,
542
+ granularity. as_str ( ) ,
543
+ )
400
544
} )
401
545
. collect :: < Result < TimestampNanosecondArray > > ( ) ?;
402
546
@@ -990,7 +1134,7 @@ mod tests {
990
1134
cases. iter ( ) . for_each ( |( original, granularity, expected) | {
991
1135
let left = string_to_timestamp_nanos ( original) . unwrap ( ) ;
992
1136
let right = string_to_timestamp_nanos ( expected) . unwrap ( ) ;
993
- let result = date_trunc_coarse ( granularity, left) . unwrap ( ) ;
1137
+ let result = date_trunc_coarse ( granularity, left, None ) . unwrap ( ) ;
994
1138
assert_eq ! ( result, right, "{original} = {expected}" ) ;
995
1139
} ) ;
996
1140
}
0 commit comments