diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index 4aec9d22114..782a071d011 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -271,4 +271,18 @@ public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch( return new SeaTunnelRuntimeException( WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params); } + + public static SeaTunnelRuntimeException formatDateTimeError(String datetime, String field) { + Map params = new HashMap<>(); + params.put("datetime", datetime); + params.put("field", field); + return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATETIME_ERROR, params); + } + + public static SeaTunnelRuntimeException formatDateError(String date, String field) { + Map params = new HashMap<>(); + params.put("date", date); + params.put("field", field); + return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params); + } } diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index f51c983456e..58939248482 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -70,7 +70,14 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH( "COMMON-31", - ": The source has '' fields, but the table of sink has '' fields. Please check schema of sink table."); + ": The source has '' fields, but the table of sink has '' fields. Please check schema of sink table."), + FORMAT_DATE_ERROR( + "COMMON-32", + "The date format '' of field '' is not supported. Please check the date format."), + FORMAT_DATETIME_ERROR( + "COMMON-33", + "The datetime format '' of field '' is not supported. Please check the datetime format."), + ; private final String code; private final String description; diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java index 9bb7554ad3f..23676513232 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java @@ -256,6 +256,9 @@ private LocalDate convertToLocalDate(JsonNode jsonNode, String fieldName) { dateFormatter = DateUtils.matchDateFormatter(dateStr); fieldFormatterMap.put(fieldName, dateFormatter); } + if (dateFormatter == null) { + throw CommonError.formatDateError(dateStr, fieldName); + } return dateFormatter.parse(jsonNode.asText()).query(TemporalQueries.localDate()); } @@ -272,6 +275,9 @@ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(datetimeStr); fieldFormatterMap.put(fieldName, dateTimeFormatter); } + if (dateTimeFormatter == null) { + throw CommonError.formatDateTimeError(datetimeStr, fieldName); + } TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(datetimeStr); LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java index 1ec184845d4..ff1bb820056 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/JsonRowDataSerDeSchemaTest.java @@ -37,8 +37,10 @@ import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDate; @@ -564,4 +566,39 @@ private void assertMapKeyType( Map keyMap = (Map) converter.convert(keyMapNode, fieldName); assertEquals(expect, keyMap.keySet().iterator().next()); } + + @Test + public void testParseUnsupportedDateTimeFormat() throws IOException { + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"date_field"}, + new SeaTunnelDataType[] {LocalTimeType.LOCAL_DATE_TYPE}); + JsonDeserializationSchema deserializationSchema = + new JsonDeserializationSchema(false, false, rowType); + String content = "{\"date_field\":\"2022-092-24\"}"; + SeaTunnelRuntimeException exception = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> deserializationSchema.deserialize(content.getBytes())); + Assertions.assertEquals( + "ErrorCode:[COMMON-32], ErrorDescription:[The date format '2022-092-24' of field 'date_field' is not supported. Please check the date format.]", + exception.getCause().getCause().getMessage()); + + SeaTunnelRowType rowType2 = + new SeaTunnelRowType( + new String[] {"timestamp_field"}, + new SeaTunnelDataType[] { + LocalTimeType.LOCAL_DATE_TIME_TYPE, + }); + JsonDeserializationSchema deserializationSchema2 = + new JsonDeserializationSchema(false, false, rowType2); + String content2 = "{\"timestamp_field\": \"2022-09-24-22:45:00\"}"; + SeaTunnelRuntimeException exception2 = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> deserializationSchema2.deserialize(content2.getBytes())); + Assertions.assertEquals( + "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]", + exception2.getCause().getCause().getMessage()); + } } diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java index dfde5683d68..8c06a0e68c4 100644 --- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; @@ -289,6 +290,9 @@ private Object convert( dateFormatter = DateUtils.matchDateFormatter(field); fieldFormatterMap.put(fieldName, dateFormatter); } + if (dateFormatter == null) { + throw CommonError.formatDateError(field, fieldName); + } return dateFormatter.parse(field).query(TemporalQueries.localDate()); case TIME: @@ -300,6 +304,9 @@ private Object convert( dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(field); fieldFormatterMap.put(fieldName, dateTimeFormatter); } + if (dateTimeFormatter == null) { + throw CommonError.formatDateTimeError(field, fieldName); + } TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(field); LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime()); @@ -320,11 +327,8 @@ private Object convert( } return new SeaTunnelRow(objects); default: - throw new SeaTunnelTextFormatException( - CommonErrorCode.UNSUPPORTED_DATA_TYPE, - String.format( - "SeaTunnel not support this data type [%s]", - fieldType.getSqlType())); + throw CommonError.unsupportedDataType( + "SeaTunnel", fieldType.getSqlType().toString(), fieldName); } } } diff --git a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java index 57e99d49b69..45574392d23 100644 --- a/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-text/src/test/java/org/apache/seatunnel/format/text/TextFormatSchemaTest.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -145,4 +146,45 @@ public void testParse() throws IOException { Assertions.assertEquals(seaTunnelRow.getField(2), "tyrantlucifer"); Assertions.assertEquals(data, content); } + + @Test + public void testParseUnsupportedDateTimeFormat() throws IOException { + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"date_field"}, + new SeaTunnelDataType[] {LocalTimeType.LOCAL_DATE_TYPE}); + TextDeserializationSchema deserializationSchema = + TextDeserializationSchema.builder() + .seaTunnelRowType(rowType) + .delimiter("\u0001") + .build(); + String content = "2022-092-24"; + SeaTunnelRuntimeException exception = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> deserializationSchema.deserialize(content.getBytes())); + Assertions.assertEquals( + "ErrorCode:[COMMON-32], ErrorDescription:[The date format '2022-092-24' of field 'date_field' is not supported. Please check the date format.]", + exception.getMessage()); + + SeaTunnelRowType rowType2 = + new SeaTunnelRowType( + new String[] {"timestamp_field"}, + new SeaTunnelDataType[] { + LocalTimeType.LOCAL_DATE_TIME_TYPE, + }); + TextDeserializationSchema deserializationSchema2 = + TextDeserializationSchema.builder() + .seaTunnelRowType(rowType2) + .delimiter("\u0001") + .build(); + String content2 = "2022-09-24-22:45:00"; + SeaTunnelRuntimeException exception2 = + Assertions.assertThrows( + SeaTunnelRuntimeException.class, + () -> deserializationSchema2.deserialize(content2.getBytes())); + Assertions.assertEquals( + "ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]", + exception2.getMessage()); + } }