Skip to content

Commit

Permalink
[Improve] Improve error message when can not parse datetime value (ap…
Browse files Browse the repository at this point in the history
…ache#7181)

* [Improve] Improve error message when can not parse datetime value

* update
  • Loading branch information
Hisoka-X authored and chaorongzhi committed Aug 21, 2024
1 parent bacdf96 commit 336c3a3
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,18 @@ public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch(
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
}

public static SeaTunnelRuntimeException formatDateTimeError(String datetime, String field) {
Map<String, String> params = new HashMap<>();
params.put("datetime", datetime);
params.put("field", field);
return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATETIME_ERROR, params);
}

public static SeaTunnelRuntimeException formatDateError(String date, String field) {
Map<String, String> params = new HashMap<>();
params.put("date", date);
params.put("field", field);
return new SeaTunnelRuntimeException(CommonErrorCode.FORMAT_DATE_ERROR, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {

WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH(
"COMMON-31",
"<connector>: The source has '<sourceFieldsNum>' fields, but the table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table.");
"<connector>: The source has '<sourceFieldsNum>' fields, but the table of sink has '<sinkFieldsNum>' fields. Please check schema of sink table."),
FORMAT_DATE_ERROR(
"COMMON-32",
"The date format '<date>' of field '<field>' is not supported. Please check the date format."),
FORMAT_DATETIME_ERROR(
"COMMON-33",
"The datetime format '<datetime>' of field '<field>' is not supported. Please check the datetime format."),
;

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ private LocalDate convertToLocalDate(JsonNode jsonNode, String fieldName) {
dateFormatter = DateUtils.matchDateFormatter(dateStr);
fieldFormatterMap.put(fieldName, dateFormatter);
}
if (dateFormatter == null) {
throw CommonError.formatDateError(dateStr, fieldName);
}

return dateFormatter.parse(jsonNode.asText()).query(TemporalQueries.localDate());
}
Expand All @@ -272,6 +275,9 @@ private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName
dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(datetimeStr);
fieldFormatterMap.put(fieldName, dateTimeFormatter);
}
if (dateTimeFormatter == null) {
throw CommonError.formatDateTimeError(datetimeStr, fieldName);
}

TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(datetimeStr);
LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
Expand Down Expand Up @@ -564,4 +566,39 @@ private void assertMapKeyType(
Map<?, ?> keyMap = (Map<?, ?>) converter.convert(keyMapNode, fieldName);
assertEquals(expect, keyMap.keySet().iterator().next());
}

@Test
public void testParseUnsupportedDateTimeFormat() throws IOException {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"date_field"},
new SeaTunnelDataType<?>[] {LocalTimeType.LOCAL_DATE_TYPE});
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(false, false, rowType);
String content = "{\"date_field\":\"2022-092-24\"}";
SeaTunnelRuntimeException exception =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() -> deserializationSchema.deserialize(content.getBytes()));
Assertions.assertEquals(
"ErrorCode:[COMMON-32], ErrorDescription:[The date format '2022-092-24' of field 'date_field' is not supported. Please check the date format.]",
exception.getCause().getCause().getMessage());

SeaTunnelRowType rowType2 =
new SeaTunnelRowType(
new String[] {"timestamp_field"},
new SeaTunnelDataType<?>[] {
LocalTimeType.LOCAL_DATE_TIME_TYPE,
});
JsonDeserializationSchema deserializationSchema2 =
new JsonDeserializationSchema(false, false, rowType2);
String content2 = "{\"timestamp_field\": \"2022-09-24-22:45:00\"}";
SeaTunnelRuntimeException exception2 =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() -> deserializationSchema2.deserialize(content2.getBytes()));
Assertions.assertEquals(
"ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]",
exception2.getCause().getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
Expand Down Expand Up @@ -253,11 +254,11 @@ private Object convert(
for (String kv : kvs) {
String[] splits = kv.split(separators[level + 2]);
if (splits.length < 2) {
objectMap.put(convert(splits[0], keyType, level + 1), null);
objectMap.put(convert(splits[0], keyType, level + 1, fieldName), null);
} else {
objectMap.put(
convert(splits[0], keyType, level + 1),
convert(splits[1], valueType, level + 1));
convert(splits[0], keyType, level + 1, fieldName),
convert(splits[1], valueType, level + 1, fieldName));
}
}
return objectMap;
Expand All @@ -284,29 +285,50 @@ private Object convert(
case BYTES:
return field.getBytes(StandardCharsets.UTF_8);
case DATE:
return DateUtils.parse(field, dateFormatter);
DateTimeFormatter dateFormatter = fieldFormatterMap.get(fieldName);
if (dateFormatter == null) {
dateFormatter = DateUtils.matchDateFormatter(field);
fieldFormatterMap.put(fieldName, dateFormatter);
}
if (dateFormatter == null) {
throw CommonError.formatDateError(field, fieldName);
}

return dateFormatter.parse(field).query(TemporalQueries.localDate());
case TIME:
return TimeUtils.parse(field, timeFormatter);
TemporalAccessor parsedTime = TIME_FORMAT.parse(field);
return parsedTime.query(TemporalQueries.localTime());
case TIMESTAMP:
return DateTimeUtils.parse(field, dateTimeFormatter);
DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName);
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(field);
fieldFormatterMap.put(fieldName, dateTimeFormatter);
}
if (dateTimeFormatter == null) {
throw CommonError.formatDateTimeError(field, fieldName);
}

TemporalAccessor parsedTimestamp = dateTimeFormatter.parse(field);
LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
return LocalDateTime.of(localDate, localTime);
case ROW:
Map<Integer, String> splitsMap =
splitLineBySeaTunnelRowType(field, (SeaTunnelRowType) fieldType, level + 1);
Object[] objects = new Object[splitsMap.size()];
String[] eleFieldNames = ((SeaTunnelRowType) fieldType).getFieldNames();
for (int i = 0; i < objects.length; i++) {
objects[i] =
convert(
splitsMap.get(i),
((SeaTunnelRowType) fieldType).getFieldType(i),
level + 1);
level + 1,
fieldName + "." + eleFieldNames[i]);
}
return new SeaTunnelRow(objects);
default:
throw new SeaTunnelTextFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format(
"SeaTunnel not support this data type [%s]",
fieldType.getSqlType()));
throw CommonError.unsupportedDataType(
"SeaTunnel", fieldType.getSqlType().toString(), fieldName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -145,4 +146,45 @@ public void testParse() throws IOException {
Assertions.assertEquals(seaTunnelRow.getField(2), "tyrantlucifer");
Assertions.assertEquals(data, content);
}

@Test
public void testParseUnsupportedDateTimeFormat() throws IOException {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"date_field"},
new SeaTunnelDataType<?>[] {LocalTimeType.LOCAL_DATE_TYPE});
TextDeserializationSchema deserializationSchema =
TextDeserializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter("\u0001")
.build();
String content = "2022-092-24";
SeaTunnelRuntimeException exception =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() -> deserializationSchema.deserialize(content.getBytes()));
Assertions.assertEquals(
"ErrorCode:[COMMON-32], ErrorDescription:[The date format '2022-092-24' of field 'date_field' is not supported. Please check the date format.]",
exception.getMessage());

SeaTunnelRowType rowType2 =
new SeaTunnelRowType(
new String[] {"timestamp_field"},
new SeaTunnelDataType<?>[] {
LocalTimeType.LOCAL_DATE_TIME_TYPE,
});
TextDeserializationSchema deserializationSchema2 =
TextDeserializationSchema.builder()
.seaTunnelRowType(rowType2)
.delimiter("\u0001")
.build();
String content2 = "2022-09-24-22:45:00";
SeaTunnelRuntimeException exception2 =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() -> deserializationSchema2.deserialize(content2.getBytes()));
Assertions.assertEquals(
"ErrorCode:[COMMON-33], ErrorDescription:[The datetime format '2022-09-24-22:45:00' of field 'timestamp_field' is not supported. Please check the datetime format.]",
exception2.getMessage());
}
}

0 comments on commit 336c3a3

Please sign in to comment.