From c7d8515c0285fc0019571a1f637377630c5a06fa Mon Sep 17 00:00:00 2001 From: Hanyu Zheng <135176127+hanyuzheng7@users.noreply.github.com> Date: Tue, 17 Dec 2024 07:23:38 -0600 Subject: [PATCH] [FLINK-35241][table] Support SQL `FLOOR` and `CEIL` functions with `SECOND` and `MINUTE` for `TIMESTAMP_TLZ` --- .../flink/table/utils/DateTimeUtils.java | 12 +++ .../codegen/calls/FloorCeilCallGen.scala | 3 +- .../functions/TimeFunctionsITCase.java | 78 ++++++++++++++++++- 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index 3a71144fcb790..c8d0f2e731127 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -1219,6 +1219,12 @@ public static long timestampFloor(TimeUnitRange range, long ts, TimeZone tz) { long utcTs = ts + offset; switch (range) { + case MILLISECOND: + return floor(utcTs, 1L) - offset; + case SECOND: + return floor(utcTs, MILLIS_PER_SECOND) - offset; + case MINUTE: + return floor(utcTs, MILLIS_PER_MINUTE) - offset; case HOUR: return floor(utcTs, MILLIS_PER_HOUR) - offset; case DAY: @@ -1249,6 +1255,12 @@ public static long timestampCeil(TimeUnitRange range, long ts, TimeZone tz) { long utcTs = ts + offset; switch (range) { + case MILLISECOND: + return ceil(utcTs, 1L) - offset; + case SECOND: + return ceil(utcTs, MILLIS_PER_SECOND) - offset; + case MINUTE: + return ceil(utcTs, MILLIS_PER_MINUTE) - offset; case HOUR: return ceil(utcTs, MILLIS_PER_HOUR) - offset; case DAY: diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala index 74e58063cbf55..ca81aa6a54e36 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/FloorCeilCallGen.scala @@ -64,7 +64,8 @@ class FloorCeilCallGen( terms => unit match { // for Timestamp with timezone info - case MILLENNIUM | CENTURY | DECADE | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR + case MILLENNIUM | CENTURY | DECADE | YEAR | QUARTER | MONTH | WEEK | DAY | HOUR | + MINUTE | SECOND | MILLISECOND if terms.length + 1 == method.getParameterCount && method.getParameterTypes()(terms.length) == classOf[TimeZone] => val timeZone = ctx.addReusableSessionTimeZone() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java index f3b706282749f..8913d23c318d0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java @@ -27,6 +27,7 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.format.DateTimeFormatter; import java.util.stream.Stream; import static org.apache.flink.table.api.DataTypes.BIGINT; @@ -37,6 +38,7 @@ import static org.apache.flink.table.api.DataTypes.INT; import static org.apache.flink.table.api.DataTypes.INTERVAL; import static org.apache.flink.table.api.DataTypes.SECOND; +import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.DataTypes.TIME; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; @@ -47,6 +49,9 @@ /** Test time-related built-in functions. */ class TimeFunctionsITCase extends BuiltInFunctionTestBase { + private static final DateTimeFormatter TIMESTAMP_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd' 'HH:mm:ss.SSS"); + @Override Stream getTestSetSpecs() { return Stream.of( @@ -439,8 +444,9 @@ private Stream ceilTestCases() { // Fractional seconds are lost LocalTime.of(11, 22, 33), LocalDate.of(1990, 10, 14), - LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987654321)) - .andDataTypes(TIME(), DATE(), TIMESTAMP()) + LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987654321), + LocalDateTime.of(2021, 9, 24, 9, 20, 50, 924325471)) + .andDataTypes(TIME(), DATE(), TIMESTAMP(), TIMESTAMP()) .testResult( $("f0").ceil(TimeIntervalUnit.MILLISECOND), "CEIL(f0 TO MILLISECOND)", @@ -580,7 +586,39 @@ private Stream ceilTestCases() { $("f2").ceil(TimeIntervalUnit.MILLENNIUM), "CEIL(f2 TO MILLENNIUM)", LocalDateTime.of(3001, 1, 1, 0, 0), - TIMESTAMP().nullable())); + TIMESTAMP().nullable()) + .testResult( + $("f3").cast(TIMESTAMP_LTZ(3)) + .ceil(TimeIntervalUnit.HOUR) + .cast(STRING()), + "CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO HOUR) AS STRING)", + LocalDateTime.of(2021, 9, 24, 10, 0, 0, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f3").cast(TIMESTAMP_LTZ(3)) + .ceil(TimeIntervalUnit.MINUTE) + .cast(STRING()), + "CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO MINUTE) AS STRING)", + LocalDateTime.of(2021, 9, 24, 9, 21, 0, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f3").cast(TIMESTAMP_LTZ(3)) + .ceil(TimeIntervalUnit.SECOND) + .cast(STRING()), + "CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO SECOND) AS STRING)", + LocalDateTime.of(2021, 9, 24, 9, 20, 51, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f3").cast(TIMESTAMP_LTZ(3)) + .ceil(TimeIntervalUnit.MILLISECOND) + .cast(STRING()), + "CAST(CEIL(CAST(f3 AS TIMESTAMP_LTZ(3)) TO MILLISECOND) AS STRING)", + LocalDateTime.of(2021, 9, 24, 9, 20, 50, 924_000_000) + .format(TIMESTAMP_FORMATTER), + STRING().nullable())); } private Stream floorTestCases() { @@ -732,6 +770,38 @@ private Stream floorTestCases() { $("f2").floor(TimeIntervalUnit.MILLENNIUM), "FLOOR(f2 TO MILLENNIUM)", LocalDateTime.of(2001, 1, 1, 0, 0), - TIMESTAMP().nullable())); + TIMESTAMP().nullable()) + .testResult( + $("f2").cast(TIMESTAMP_LTZ(3)) + .floor(TimeIntervalUnit.SECOND) + .cast(STRING()), + "CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO SECOND) AS STRING)", + LocalDateTime.of(2020, 2, 29, 1, 56, 59, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f2").cast(TIMESTAMP_LTZ(3)) + .floor(TimeIntervalUnit.MINUTE) + .cast(STRING()), + "CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO MINUTE) AS STRING)", + LocalDateTime.of(2020, 2, 29, 1, 56, 0, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f2").cast(TIMESTAMP_LTZ(3)) + .floor(TimeIntervalUnit.HOUR) + .cast(STRING()), + "CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO HOUR) AS STRING)", + LocalDateTime.of(2020, 2, 29, 1, 0, 0, 0) + .format(TIMESTAMP_FORMATTER), + STRING().nullable()) + .testResult( + $("f2").cast(TIMESTAMP_LTZ(3)) + .floor(TimeIntervalUnit.MILLISECOND) + .cast(STRING()), + "CAST(FLOOR(CAST(f2 AS TIMESTAMP_LTZ(3)) TO MILLISECOND) AS STRING)", + LocalDateTime.of(2020, 2, 29, 1, 56, 59, 987_000_000) + .format(TIMESTAMP_FORMATTER), + STRING().nullable())); } }