Skip to content

Commit

Permalink
[FLINK-35241][table] Support SQL FLOOR and CEIL functions with `S…
Browse files Browse the repository at this point in the history
…ECOND` and `MINUTE` for `TIMESTAMP_TLZ`
  • Loading branch information
hanyuzheng7 authored Dec 17, 2024
1 parent 473a3e8 commit c7d8515
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,12 @@ public static long timestampFloor(TimeUnitRange range, long ts, TimeZone tz) {
long utcTs = ts + offset;

switch (range) {
case MILLISECOND:
return floor(utcTs, 1L) - offset;
case SECOND:
return floor(utcTs, MILLIS_PER_SECOND) - offset;
case MINUTE:
return floor(utcTs, MILLIS_PER_MINUTE) - offset;
case HOUR:
return floor(utcTs, MILLIS_PER_HOUR) - offset;
case DAY:
Expand Down Expand Up @@ -1249,6 +1255,12 @@ public static long timestampCeil(TimeUnitRange range, long ts, TimeZone tz) {
long utcTs = ts + offset;

switch (range) {
case MILLISECOND:
return ceil(utcTs, 1L) - offset;
case SECOND:
return ceil(utcTs, MILLIS_PER_SECOND) - offset;
case MINUTE:
return ceil(utcTs, MILLIS_PER_MINUTE) - offset;
case HOUR:
return ceil(utcTs, MILLIS_PER_HOUR) - offset;
case DAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FloorCeilCallGen(
terms =>
unit match {
// for Timestamp with timezone info
case MILLENNIUM | CENTURY | DECADE | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR
case MILLENNIUM | CENTURY | DECADE | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR |
MINUTE | SECOND | MILLISECOND
if terms.length + 1 == method.getParameterCount &&
method.getParameterTypes()(terms.length) == classOf[TimeZone] =>
val timeZone = ctx.addReusableSessionTimeZone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.stream.Stream;

import static org.apache.flink.table.api.DataTypes.BIGINT;
Expand All @@ -37,6 +38,7 @@
import static org.apache.flink.table.api.DataTypes.INT;
import static org.apache.flink.table.api.DataTypes.INTERVAL;
import static org.apache.flink.table.api.DataTypes.SECOND;
import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
Expand All @@ -47,6 +49,9 @@
/** Test time-related built-in functions. */
class TimeFunctionsITCase extends BuiltInFunctionTestBase {

private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd' 'HH:mm:ss.SSS");

@Override
Stream<TestSetSpec> getTestSetSpecs() {
return Stream.of(
Expand Down Expand Up @@ -439,8 +444,9 @@ private Stream<TestSetSpec> ceilTestCases() {
// Fractional seconds are lost
LocalTime.of(11, 22, 33),
LocalDate.of(1990, 10, 14),
LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987654321))
.andDataTypes(TIME(), DATE(), TIMESTAMP())
LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987654321),
LocalDateTime.of(2021, 9, 24, 9, 20, 50, 924325471))
.andDataTypes(TIME(), DATE(), TIMESTAMP(), TIMESTAMP())
.testResult(
$("f0").ceil(TimeIntervalUnit.MILLISECOND),
"CEIL(f0 TO MILLISECOND)",
Expand Down Expand Up @@ -580,7 +586,39 @@ private Stream<TestSetSpec> ceilTestCases() {
$("f2").ceil(TimeIntervalUnit.MILLENNIUM),
"CEIL(f2 TO MILLENNIUM)",
LocalDateTime.of(3001, 1, 1, 0, 0),
TIMESTAMP().nullable()));
TIMESTAMP().nullable())
.testResult(
$("f3").cast(TIMESTAMP_LTZ(3))
.ceil(TimeIntervalUnit.HOUR)
.cast(STRING()),
"CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO HOUR) AS STRING)",
LocalDateTime.of(2021, 9, 24, 10, 0, 0, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f3").cast(TIMESTAMP_LTZ(3))
.ceil(TimeIntervalUnit.MINUTE)
.cast(STRING()),
"CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO MINUTE) AS STRING)",
LocalDateTime.of(2021, 9, 24, 9, 21, 0, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f3").cast(TIMESTAMP_LTZ(3))
.ceil(TimeIntervalUnit.SECOND)
.cast(STRING()),
"CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO SECOND) AS STRING)",
LocalDateTime.of(2021, 9, 24, 9, 20, 51, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f3").cast(TIMESTAMP_LTZ(3))
.ceil(TimeIntervalUnit.MILLISECOND)
.cast(STRING()),
"CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO MILLISECOND) AS STRING)",
LocalDateTime.of(2021, 9, 24, 9, 20, 50, 924_000_000)
.format(TIMESTAMP_FORMATTER),
STRING().nullable()));
}

private Stream<TestSetSpec> floorTestCases() {
Expand Down Expand Up @@ -732,6 +770,38 @@ private Stream<TestSetSpec> floorTestCases() {
$("f2").floor(TimeIntervalUnit.MILLENNIUM),
"FLOOR(f2 TO MILLENNIUM)",
LocalDateTime.of(2001, 1, 1, 0, 0),
TIMESTAMP().nullable()));
TIMESTAMP().nullable())
.testResult(
$("f2").cast(TIMESTAMP_LTZ(3))
.floor(TimeIntervalUnit.SECOND)
.cast(STRING()),
"CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO SECOND) AS STRING)",
LocalDateTime.of(2020, 2, 29, 1, 56, 59, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f2").cast(TIMESTAMP_LTZ(3))
.floor(TimeIntervalUnit.MINUTE)
.cast(STRING()),
"CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO MINUTE) AS STRING)",
LocalDateTime.of(2020, 2, 29, 1, 56, 0, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f2").cast(TIMESTAMP_LTZ(3))
.floor(TimeIntervalUnit.HOUR)
.cast(STRING()),
"CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO HOUR) AS STRING)",
LocalDateTime.of(2020, 2, 29, 1, 0, 0, 0)
.format(TIMESTAMP_FORMATTER),
STRING().nullable())
.testResult(
$("f2").cast(TIMESTAMP_LTZ(3))
.floor(TimeIntervalUnit.MILLISECOND)
.cast(STRING()),
"CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO MILLISECOND) AS STRING)",
LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987_000_000)
.format(TIMESTAMP_FORMATTER),
STRING().nullable()));
}
}

0 comments on commit c7d8515

Please sign in to comment.