Skip to content

Commit

Permalink
feat: remove unsupported type mappings
Browse files Browse the repository at this point in the history
Simplifies code and add testing for unsupported types.
  • Loading branch information
thiagotnunes committed Oct 1, 2024
1 parent 75a65d9 commit 7f6d196
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.ResultSetValueExtractor;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.rowmapper.ResultSetValueMapper;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.TimeStampTz;
import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.TimeTz;
import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
Expand All @@ -44,24 +43,6 @@ public class PostgreSQLJdbcValueMappings implements JdbcValueMappingsProvider {
private static final Calendar UTC_CALENDAR =
Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC));

private static final DateTimeFormatter TIME_FORMAT =
new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 6, true)
.optionalEnd()
.appendOffsetId()
.toFormatter();

private static final DateTimeFormatter TIMETZ_FORMAT =
new DateTimeFormatterBuilder()
.appendPattern("HH:mm:ss")
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 6, true)
.optionalEnd()
.appendOffset("+HH:mm", "+00")
.toFormatter();

private static final DateTimeFormatter TIMESTAMPTZ_FORMAT =
new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
Expand Down Expand Up @@ -97,27 +78,6 @@ private static long toMicros(OffsetTime offsetTime) {
private static final ResultSetValueExtractor<java.sql.Date> dateExtractor =
(rs, fieldName) -> rs.getDate(fieldName, UTC_CALENDAR);

// We cannot use `java.sql.Time` here, as the PostgreSQL time might be '24:00:00'. This makes the
// java.sql.Time jump a day (from the underlying `java.sql.Date`) and microseconds extraction gets
// a wrong value. We parse the time from a String into an `OffsetTime` instead.
private static final ResultSetValueExtractor<OffsetTime> timeExtractor =
(rs, fieldName) -> {
String time = rs.getString(fieldName);
if (time == null) {
return null;
}
return OffsetTime.parse(time + ZoneOffset.UTC, TIME_FORMAT);
};

private static final ResultSetValueExtractor<OffsetTime> timetzExtractor =
(rs, fieldName) -> {
String timeTz = rs.getString(fieldName);
if (timeTz == null) {
return null;
}
return OffsetTime.parse(timeTz, TIMETZ_FORMAT);
};

private static final ResultSetValueExtractor<java.sql.Timestamp> timestampExtractor =
(rs, fieldName) -> rs.getTimestamp(fieldName, UTC_CALENDAR);

Expand All @@ -140,20 +100,6 @@ private static long toMicros(OffsetTime offsetTime) {
private static final ResultSetValueMapper<java.sql.Timestamp> timestampToAvro =
(value, schema) -> toMicros(value.toInstant());

private static final ResultSetValueMapper<OffsetTime> timeToAvro =
(value, schema) -> toMicros(value);

private static final ResultSetValueMapper<OffsetTime> timetzToAvro =
(value, schema) ->
new GenericRecordBuilder(TimeTz.SCHEMA)
.set(
TimeTz.TIME_FIELD_NAME,
TimeUnit.NANOSECONDS.toMicros(value.toLocalTime().toNanoOfDay()))
.set(
TimeTz.OFFSET_FIELD_NAME,
TimeUnit.SECONDS.toMillis(value.getOffset().getTotalSeconds()))
.build();

private static final ResultSetValueMapper<OffsetDateTime> timestamptzToAvro =
(value, schema) ->
new GenericRecordBuilder(TimeStampTz.SCHEMA)
Expand All @@ -171,41 +117,26 @@ private static long toMicros(OffsetTime offsetTime) {
.put("BIT VARYING", Pair.of(bytesExtractor, valuePassThrough))
.put("BOOL", Pair.of(ResultSet::getBoolean, valuePassThrough))
.put("BOOLEAN", Pair.of(ResultSet::getBoolean, valuePassThrough))
.put("BOX", Pair.of(ResultSet::getString, valuePassThrough))
.put("BYTEA", Pair.of(bytesExtractor, valuePassThrough))
.put("CHAR", Pair.of(ResultSet::getString, valuePassThrough))
.put("CHARACTER", Pair.of(ResultSet::getString, valuePassThrough))
.put("CHARACTER VARYING", Pair.of(ResultSet::getString, valuePassThrough))
.put("CIDR", Pair.of(ResultSet::getString, valuePassThrough))
.put("CIRCLE", Pair.of(ResultSet::getString, valuePassThrough))
.put("CITEXT", Pair.of(ResultSet::getString, valuePassThrough))
.put("DATE", Pair.of(dateExtractor, dateToAvro))
.put("DECIMAL", Pair.of(ResultSet::getObject, numericToAvro))
.put("DOUBLE PRECISION", Pair.of(ResultSet::getDouble, valuePassThrough))
.put("ENUM", Pair.of(ResultSet::getString, valuePassThrough))
.put("FLOAT4", Pair.of(ResultSet::getFloat, valuePassThrough))
.put("FLOAT8", Pair.of(ResultSet::getDouble, valuePassThrough))
.put("INET", Pair.of(ResultSet::getString, valuePassThrough))
.put("INT", Pair.of(ResultSet::getInt, valuePassThrough))
.put("INTEGER", Pair.of(ResultSet::getInt, valuePassThrough))
// TODO(thiagotnunes): INTERVAL
.put("INT2", Pair.of(ResultSet::getInt, valuePassThrough))
.put("INT4", Pair.of(ResultSet::getInt, valuePassThrough))
.put("INT8", Pair.of(ResultSet::getLong, valuePassThrough))
.put("JSON", Pair.of(ResultSet::getString, valuePassThrough))
.put("JSONB", Pair.of(ResultSet::getString, valuePassThrough))
.put("LINE", Pair.of(ResultSet::getString, valuePassThrough))
.put("LSEG", Pair.of(ResultSet::getString, valuePassThrough))
.put("MACADDR", Pair.of(ResultSet::getString, valuePassThrough))
.put("MACADDR8", Pair.of(ResultSet::getString, valuePassThrough))
.put("MONEY", Pair.of(ResultSet::getDouble, valuePassThrough))
.put("NUMERIC", Pair.of(ResultSet::getObject, numericToAvro))
.put("OID", Pair.of(ResultSet::getLong, valuePassThrough))
.put("PATH", Pair.of(ResultSet::getString, valuePassThrough))
.put("PG_LSN", Pair.of(ResultSet::getString, valuePassThrough))
.put("PG_SNAPSHOT", Pair.of(ResultSet::getString, valuePassThrough))
.put("POINT", Pair.of(ResultSet::getString, valuePassThrough))
.put("POLYGON", Pair.of(ResultSet::getString, valuePassThrough))
.put("REAL", Pair.of(ResultSet::getFloat, valuePassThrough))
.put("SERIAL", Pair.of(ResultSet::getInt, valuePassThrough))
.put("SERIAL2", Pair.of(ResultSet::getInt, valuePassThrough))
Expand All @@ -214,21 +145,13 @@ private static long toMicros(OffsetTime offsetTime) {
.put("SMALLINT", Pair.of(ResultSet::getInt, valuePassThrough))
.put("SMALLSERIAL", Pair.of(ResultSet::getInt, valuePassThrough))
.put("TEXT", Pair.of(ResultSet::getString, valuePassThrough))
.put("TIME", Pair.of(timeExtractor, timeToAvro))
.put("TIMETZ", Pair.of(timetzExtractor, timetzToAvro))
.put("TIMESTAMP", Pair.of(timestampExtractor, timestampToAvro))
.put("TIMESTAMPTZ", Pair.of(timestamptzExtractor, timestamptzToAvro))
.put("TIME WITH TIME ZONE", Pair.of(timetzExtractor, timetzToAvro))
.put("TIME WITHOUT TIME ZONE", Pair.of(timeExtractor, timeToAvro))
.put("TIMESTAMP WITH TIME ZONE", Pair.of(timestamptzExtractor, timestamptzToAvro))
.put("TIMESTAMP WITHOUT TIME ZONE", Pair.of(timestampExtractor, timestampToAvro))
.put("TSQUERY", Pair.of(ResultSet::getString, valuePassThrough))
.put("TSVECTOR", Pair.of(ResultSet::getString, valuePassThrough))
.put("TXID_SNAPSHOT", Pair.of(ResultSet::getString, valuePassThrough))
.put("UUID", Pair.of(ResultSet::getString, valuePassThrough))
.put("VARBIT", Pair.of(bytesExtractor, valuePassThrough))
.put("VARCHAR", Pair.of(ResultSet::getString, valuePassThrough))
.put("XML", Pair.of(ResultSet::getString, valuePassThrough))
.build()
.entrySet()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public final class PostgreSQLMappingProvider {
.put("BIT VARYING", UnifiedMappingProvider.Type.BYTES)
.put("BOOL", UnifiedMappingProvider.Type.BOOLEAN)
.put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN)
.put("BOX", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("BYTEA", UnifiedMappingProvider.Type.BYTES)
// TODO(thiagotnunes): Refine mapping type according to
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit
Expand All @@ -52,36 +51,27 @@ public final class PostgreSQLMappingProvider {
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit
// for length we should use varchar instead)
.put("CHARACTER VARYING", UnifiedMappingProvider.Type.STRING)
.put("CIDR", UnifiedMappingProvider.Type.STRING)
.put("CIRCLE", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("CITEXT", UnifiedMappingProvider.Type.STRING)
.put("DATE", UnifiedMappingProvider.Type.DATE)
// TODO(thiagotnunes): Refine mapping type according to
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a
// precision and scale are >= 0, map to DECIMAL)
.put("DECIMAL", UnifiedMappingProvider.Type.NUMBER)
.put("DOUBLE PRECISION", UnifiedMappingProvider.Type.DOUBLE)
.put("ENUM", UnifiedMappingProvider.Type.STRING)
.put("FLOAT4", UnifiedMappingProvider.Type.FLOAT)
.put("FLOAT8", UnifiedMappingProvider.Type.DOUBLE)
.put("INET", UnifiedMappingProvider.Type.STRING)
.put("INT", UnifiedMappingProvider.Type.INTEGER)
.put("INTEGER", UnifiedMappingProvider.Type.INTEGER)
// TODO(thiagotnunes): INTERVAL
.put("INT2", UnifiedMappingProvider.Type.INTEGER)
.put("INT4", UnifiedMappingProvider.Type.INTEGER)
.put("INT8", UnifiedMappingProvider.Type.LONG)
.put("JSON", UnifiedMappingProvider.Type.JSON)
.put("JSONB", UnifiedMappingProvider.Type.JSON)
.put("LINE", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("LSEG", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("MACADDR", UnifiedMappingProvider.Type.STRING)
.put("MACADDR8", UnifiedMappingProvider.Type.STRING)
.put("MONEY", UnifiedMappingProvider.Type.DOUBLE)
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a
// precision and scale are >= 0, map to DECIMAL)
.put("NUMERIC", UnifiedMappingProvider.Type.NUMBER)
.put("OID", UnifiedMappingProvider.Type.LONG)
.put("PATH", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("PG_LSN", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("PG_SNAPSHOT", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("POINT", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("POLYGON", UnifiedMappingProvider.Type.UNSUPPORTED)
.put("REAL", UnifiedMappingProvider.Type.FLOAT)
.put("SERIAL", UnifiedMappingProvider.Type.INTEGER)
.put("SERIAL2", UnifiedMappingProvider.Type.INTEGER)
Expand All @@ -93,24 +83,17 @@ public final class PostgreSQLMappingProvider {
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit
// for length we should use varchar instead)
.put("TEXT", UnifiedMappingProvider.Type.STRING)
.put("TIME", UnifiedMappingProvider.Type.TIME)
.put("TIMETZ", UnifiedMappingProvider.Type.TIME_WITH_TIME_ZONE)
.put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP)
.put("TIMESTAMPTZ", UnifiedMappingProvider.Type.TIMESTAMP_WITH_TIME_ZONE)
.put("TIME WITH TIME ZONE", UnifiedMappingProvider.Type.TIME_WITH_TIME_ZONE)
.put("TIME WITHOUT TIME ZONE", UnifiedMappingProvider.Type.TIME)
.put("TIMESTAMP WITH TIME ZONE", UnifiedMappingProvider.Type.TIMESTAMP_WITH_TIME_ZONE)
.put("TIMESTAMP WITHOUT TIME ZONE", UnifiedMappingProvider.Type.TIMESTAMP)
.put("TSQUERY", UnifiedMappingProvider.Type.STRING)
.put("TSVECTOR", UnifiedMappingProvider.Type.STRING)
.put("TXID_SNAPSHOT", UnifiedMappingProvider.Type.STRING)
.put("UUID", UnifiedMappingProvider.Type.STRING)
.put("VARBIT", UnifiedMappingProvider.Type.BYTES)
// TODO(thiagotnunes): Refine mapping type according to
// https://cloud.google.com/datastream/docs/unified-types#map-psql (if there is a limit
// for length we should use varchar instead)
.put("VARCHAR", UnifiedMappingProvider.Type.STRING)
.put("XML", UnifiedMappingProvider.Type.STRING)
.put("UNSUPPORTED", UnifiedMappingProvider.Type.UNSUPPORTED)
.build()
.entrySet()
.stream()
Expand Down
Loading

0 comments on commit 7f6d196

Please sign in to comment.