From 7cee0ba437b58e044483be7206dbf846eca084ca Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 19 Nov 2024 09:30:41 -0800 Subject: [PATCH] Bulk load CDK: test refactors (#48488) Co-authored-by: Johnny Schmidt --- airbyte-cdk/bulk/core/load/build.gradle | 3 + .../MockBasicFunctionalityIntegrationTest.kt | 4 +- .../io/airbyte/cdk/load/data/AirbyteValue.kt | 5 +- .../load/data/AirbyteValueIdentityMapper.kt | 50 ++-- .../cdk/load/data/NullOutOfRangeIntegers.kt | 28 +++ .../load/data/SchemalessTypesToJsonString.kt | 1 + .../cdk/load/data/TimeStringToInteger.kt | 23 +- .../cdk/load/data/json/JsonToAirbyteValue.kt | 19 +- .../cdk/load/message/DestinationMessage.kt | 8 +- .../message/DestinationMessageDeserializer.kt | 4 +- .../io/airbyte/cdk/load/util/JsonUtils.kt | 42 +++- .../load/command/DestinationCatalogTest.kt | 8 +- .../data/AirbyteValueIdentityMapperTest.kt | 7 +- .../cdk/load/data/MapperPipelineTest.kt | 2 +- .../load/data/NullOutOfRangeIntegersTest.kt | 85 +++++++ .../json/JsonSchemaToAirbyteSchemaTypeTest.kt | 10 +- .../load/data/json/JsonToAirbyteValueTest.kt | 9 +- .../load/message/DestinationMessageTest.kt | 42 ++-- .../implementor/ProcessRecordsTaskTest.kt | 5 +- .../io/airbyte/cdk/load/spec/SpecTest.kt | 6 +- .../AirbyteValueWithMetaToOutputRecord.kt | 6 +- .../cdk/load/test/util/IntegrationTest.kt | 7 +- .../cdk/load/test/util/RecordDiffer.kt | 49 +++- .../DockerizedDestination.kt | 10 +- .../NonDockerizedDestination.kt | 4 +- .../BasicFunctionalityIntegrationTest.kt | 226 +++++++++--------- .../load/data/avro/AirbyteTypeToAvroSchema.kt | 45 ++-- .../data/avro/AirbyteValueToAvroRecord.kt | 2 +- .../data/avro/AvroMapperPipelineFactory.kt | 2 + .../data/avro/AvroRecordToAirbyteValue.kt | 33 ++- .../cdk/load/data/csv/CsvRowToAirbyteValue.kt | 109 +++++---- .../ObjectStoragePathFactory.kt | 28 ++- .../ObjectStorageDestinationStateManager.kt | 17 +- .../ObjectStorageStreamLoaderFactory.kt | 1 - .../ObjectStorageDestinationStateTest.kt | 3 +- .../io/airbyte/cdk/load/MockPathFactory.kt | 4 +- .../parquet/ParquetMapperPipelineFactory.kt | 2 + .../cdk/load/file/parquet/ParquetWriter.kt | 4 +- .../destination-s3-v2/metadata.yaml | 2 +- ...2CsvAssumeRoleDestinationAcceptanceTest.kt | 2 + .../destination/s3_v2/S3V2WriteTest.kt | 66 ++--- 41 files changed, 613 insertions(+), 370 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegersTest.kt diff --git a/airbyte-cdk/bulk/core/load/build.gradle b/airbyte-cdk/bulk/core/load/build.gradle index 7157392b170b..06836bda3532 100644 --- a/airbyte-cdk/bulk/core/load/build.gradle +++ b/airbyte-cdk/bulk/core/load/build.gradle @@ -24,6 +24,9 @@ dependencies { testImplementation("io.mockk:mockk:1.13.12") implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20" testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7" + + implementation 'com.fasterxml.jackson.module:jackson-module-kotlin' + implementation 'com.fasterxml.jackson.module:jackson-module-afterburner' } def integrationTestTask = tasks.register('integrationTest', Test) { diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index 874489a1d463..b40faaebb11e 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -90,7 +90,7 @@ class MockBasicFunctionalityIntegrationTest : } @Test - override fun testAllTypes() { - super.testAllTypes() + override fun testBasicTypes() { + super.testBasicTypes() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt index dcd03e59b259..45b92d3e0fba 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data import com.fasterxml.jackson.databind.JsonNode import java.math.BigDecimal +import java.math.BigInteger import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -23,6 +24,7 @@ sealed interface AirbyteValue { is Boolean -> BooleanValue(value) is Int -> IntegerValue(value.toLong()) is Long -> IntegerValue(value) + is BigInteger -> IntegerValue(value) is Double -> NumberValue(BigDecimal.valueOf(value)) is BigDecimal -> NumberValue(value) is LocalDate -> DateValue(value.toString()) @@ -60,7 +62,8 @@ value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable { +value class IntegerValue(val value: BigInteger) : AirbyteValue, Comparable { + constructor(value: Long) : this(BigInteger.valueOf(value)) override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt index 66beb07b5bff..c5576389e133 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapper.kt @@ -41,6 +41,17 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { it.first to it.second.changes.toList() } + fun nulledOut( + schema: AirbyteType, + context: Context, + reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR + ): Pair { + context.changes.add( + DestinationRecord.Change(context.path.joinToString("."), Change.NULLED, reason) + ) + return mapInner(NullValue, schema, context) + } + fun mapInner( value: AirbyteValue, schema: AirbyteType, @@ -69,32 +80,23 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { is NumberType -> mapNumber(value as NumberValue, context) is StringType -> mapString(value as StringValue, context) is IntegerType -> mapInteger(value as IntegerValue, context) - is DateType -> mapDate(value as DateValue, context) + is DateType -> mapDate(value, context) is TimeTypeWithTimezone -> mapTimeWithTimezone( - value as TimeValue, + value, context, ) is TimeTypeWithoutTimezone -> mapTimeWithoutTimezone( - value as TimeValue, + value, context, ) - is TimestampTypeWithTimezone -> - mapTimestampWithTimezone(value as TimestampValue, context) - is TimestampTypeWithoutTimezone -> - mapTimestampWithoutTimezone(value as TimestampValue, context) + is TimestampTypeWithTimezone -> mapTimestampWithTimezone(value, context) + is TimestampTypeWithoutTimezone -> mapTimestampWithoutTimezone(value, context) is UnknownType -> mapUnknown(value as UnknownValue, context) } } catch (e: Exception) { - context.changes.add( - DestinationRecord.Change( - context.path.joinToString("."), - Change.NULLED, - Reason.DESTINATION_SERIALIZATION_ERROR - ) - ) - mapInner(NullValue, schema, context) + nulledOut(schema, context) } open fun mapObject( @@ -171,24 +173,30 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper { open fun mapInteger(value: IntegerValue, context: Context): Pair = value to context - open fun mapDate(value: DateValue, context: Context): Pair = + /** + * Time types are only allowed to be strings on the wire, but can be Int/egerValue if passed + * through [TimeStringToInteger]. + */ + open fun mapDate(value: AirbyteValue, context: Context): Pair = value to context - open fun mapTimeWithTimezone(value: TimeValue, context: Context): Pair = - value to context + open fun mapTimeWithTimezone( + value: AirbyteValue, + context: Context + ): Pair = value to context open fun mapTimeWithoutTimezone( - value: TimeValue, + value: AirbyteValue, context: Context ): Pair = value to context open fun mapTimestampWithTimezone( - value: TimestampValue, + value: AirbyteValue, context: Context ): Pair = value to context open fun mapTimestampWithoutTimezone( - value: TimestampValue, + value: AirbyteValue, context: Context ): Pair = value to context diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt new file mode 100644 index 000000000000..f89d5eb9799e --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegers.kt @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange +import java.math.BigInteger + +/** + * Mapper for nulling out integers that are out of range. The default behavior is to null out + * integers that are outside the range of a 64-bit signed integer. + */ +class NullOutOfRangeIntegers( + private val minValue: BigInteger = Long.MIN_VALUE.toBigInteger(), + private val maxValue: BigInteger = Long.MAX_VALUE.toBigInteger() +) : AirbyteValueIdentityMapper() { + override fun mapInteger(value: IntegerValue, context: Context): Pair { + if (value.value < minValue || value.value > maxValue) { + return nulledOut( + IntegerType, + context, + AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION + ) + } + return super.mapInteger(value, context) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt index 93446f0d0c1b..64d15a6002c7 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/SchemalessTypesToJsonString.kt @@ -12,6 +12,7 @@ class SchemalessTypesToJsonString : AirbyteSchemaIdentityMapper { override fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType = StringType override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = StringType + override fun mapUnknown(schema: UnknownType): AirbyteType = StringType } class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt index faee3d57e8d3..49bbb774d515 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringToInteger.kt @@ -29,7 +29,8 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() { ) } - override fun mapDate(value: DateValue, context: Context): Pair { + override fun mapDate(value: AirbyteValue, context: Context): Pair { + value as DateValue val epochDay = LocalDate.parse(value.value, DATE_TIME_FORMATTER).toEpochDay() return IntValue(epochDay.toInt()) to context } @@ -53,14 +54,16 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() { } override fun mapTimeWithTimezone( - value: TimeValue, + value: AirbyteValue, context: Context - ): Pair = IntegerValue(toMicrosOfDay(value.value)) to context + ): Pair = + IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context override fun mapTimeWithoutTimezone( - value: TimeValue, + value: AirbyteValue, context: Context - ): Pair = IntegerValue(toMicrosOfDay(value.value)) to context + ): Pair = + IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context private fun toEpochMicrosWithTimezone(timestampString: String): Long { val zdt = ZonedDateTime.parse(timestampString, DATE_TIME_FORMATTER) @@ -83,11 +86,13 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() { } override fun mapTimestampWithTimezone( - value: TimestampValue, + value: AirbyteValue, context: Context - ): Pair = IntegerValue(toEpochMicros(value.value)) to context + ): Pair = + IntegerValue(toEpochMicros((value as TimestampValue).value)) to context override fun mapTimestampWithoutTimezone( - value: TimestampValue, + value: AirbyteValue, context: Context - ): Pair = IntegerValue(toEpochMicros(value.value)) to context + ): Pair = + IntegerValue(toEpochMicros((value as TimestampValue).value)) to context } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt index 85c87043b022..8f08f3017007 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt @@ -6,8 +6,9 @@ package io.airbyte.cdk.load.data.json import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.load.data.* -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.serializeToString import java.math.BigDecimal +import java.math.BigInteger /** * Converts from json to airbyte value, performing the minimum validation necessary to marshal to a @@ -71,7 +72,7 @@ class JsonToAirbyteValue { return if (json.isTextual) { StringValue(json.asText()) } else { - StringValue(Jsons.writeValueAsString(json)) + StringValue(json.serializeToString()) } } @@ -90,10 +91,10 @@ class JsonToAirbyteValue { private fun toInteger(json: JsonNode): IntegerValue { val longVal = when { - json.isBoolean -> if (json.asBoolean()) 1L else 0L - json.isIntegralNumber -> json.asLong() - json.isFloatingPointNumber -> json.asDouble().toLong() - json.isTextual -> json.asText().toLong() + json.isBoolean -> if (json.asBoolean()) BigInteger.ONE else BigInteger.ZERO + json.isIntegralNumber -> json.bigIntegerValue() + json.isFloatingPointNumber -> json.bigIntegerValue() + json.isTextual -> json.asText().toBigInteger() else -> throw IllegalArgumentException("Could not convert $json to Integer") } return IntegerValue(longVal) @@ -103,8 +104,8 @@ class JsonToAirbyteValue { val numVal = when { json.isBoolean -> BigDecimal(if (json.asBoolean()) 1.0 else 0.0) - json.isIntegralNumber -> json.asLong().toBigDecimal() - json.isFloatingPointNumber -> json.asDouble().toBigDecimal() + json.isIntegralNumber -> json.decimalValue() + json.isFloatingPointNumber -> json.decimalValue() json.isTextual -> json.asText().toBigDecimal() else -> throw IllegalArgumentException("Could not convert $json to Number") } @@ -150,7 +151,7 @@ class JsonToAirbyteValue { return convert(json, option) } - private fun fromJson(json: JsonNode): AirbyteValue { + fun fromJson(json: JsonNode): AirbyteValue { return when { json.isBoolean -> toBoolean(json) json.isIntegralNumber -> toInteger(json) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt index b98e74071233..c07523406605 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt @@ -14,7 +14,7 @@ import io.airbyte.cdk.load.data.json.AirbyteValueToJson import io.airbyte.cdk.load.data.json.JsonToAirbyteValue import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint import io.airbyte.cdk.load.message.CheckpointMessage.Stats -import io.airbyte.protocol.models.Jsons +import io.airbyte.cdk.load.util.deserializeToNode import io.airbyte.protocol.models.v0.AirbyteGlobalState import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessage @@ -62,7 +62,7 @@ data class DestinationRecord( changes: MutableList = mutableListOf(), ) : this( stream = DestinationStream.Descriptor(namespace, name), - data = JsonToAirbyteValue().convert(Jsons.deserialize(data), ObjectTypeWithoutSchema), + data = JsonToAirbyteValue().convert(data.deserializeToNode(), ObjectTypeWithoutSchema), emittedAtMs = emittedAtMs, meta = Meta(changes), serialized = "", @@ -287,7 +287,7 @@ data class StreamCheckpoint( ) : this( Checkpoint( DestinationStream.Descriptor(streamNamespace, streamName), - state = Jsons.deserialize(blob) + state = blob.deserializeToNode() ), Stats(sourceRecordCount), destinationRecordCount?.let { Stats(it) }, @@ -318,7 +318,7 @@ data class GlobalCheckpoint( blob: String, sourceRecordCount: Long, ) : this( - state = Jsons.deserialize(blob), + state = blob.deserializeToNode(), Stats(sourceRecordCount), additionalProperties = emptyMap(), ) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt index 916d3274e669..a25526654803 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessageDeserializer.kt @@ -4,7 +4,7 @@ package io.airbyte.cdk.load.message -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.deserializeToClass import io.airbyte.protocol.models.v0.AirbyteMessage import jakarta.inject.Singleton @@ -22,7 +22,7 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat override fun deserialize(serialized: String): DestinationMessage { try { - val airbyteMessage = Jsons.readValue(serialized, AirbyteMessage::class.java) + val airbyteMessage = serialized.deserializeToClass(AirbyteMessage::class.java) return messageFactory.fromAirbyteMessage(airbyteMessage, serialized) } catch (t: Throwable) { throw RuntimeException("Failed to deserialize AirbyteMessage") diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/JsonUtils.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/JsonUtils.kt index 050e32c918bb..7fea7a48bd7b 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/JsonUtils.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/util/JsonUtils.kt @@ -4,17 +4,55 @@ package io.airbyte.cdk.load.util +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.core.JsonGenerator +import com.fasterxml.jackson.core.StreamReadConstraints +import com.fasterxml.jackson.databind.DeserializationFeature import com.fasterxml.jackson.databind.JsonNode -import io.airbyte.cdk.util.Jsons +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.afterburner.AfterburnerModule +import com.fasterxml.jackson.module.kotlin.registerKotlinModule +import java.io.InputStream -fun JsonNode.serializeToString(): String { +object Jsons : ObjectMapper() { + // allow jackson to deserialize anything under 100 MiB + // (the default, at time of writing 2024-05-29, with jackson 2.15.2, is 20 MiB) + private const val JSON_MAX_LENGTH = 100 * 1024 * 1024 + + init { + registerKotlinModule() + registerModule(JavaTimeModule()) + registerModule(AfterburnerModule()) + setSerializationInclusion(JsonInclude.Include.NON_NULL) + configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true) + configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true) + factory.setStreamReadConstraints( + StreamReadConstraints.builder().maxStringLength(JSON_MAX_LENGTH).build() + ) + } +} + +fun T.serializeToString(): String { return Jsons.writeValueAsString(this) } +fun InputStream.readIntoClass(klass: Class): T = + Jsons.readTree(this).let { Jsons.treeToValue(it, klass) } + +fun T.deserializeToPrettyPrintedString(): String { + return Jsons.writerWithDefaultPrettyPrinter().writeValueAsString(this) +} + fun String.deserializeToNode(): JsonNode { return Jsons.readTree(this) } +fun String.deserializeToClass(klass: Class): T { + return Jsons.readValue(this, klass) +} + fun Any.serializeToJsonBytes(): ByteArray { return Jsons.writeValueAsBytes(this) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/command/DestinationCatalogTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/command/DestinationCatalogTest.kt index c21f0775ffe0..5f39b6296188 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/command/DestinationCatalogTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/command/DestinationCatalogTest.kt @@ -4,7 +4,7 @@ package io.airbyte.cdk.load.command -import io.airbyte.protocol.models.Jsons +import io.airbyte.cdk.load.util.deserializeToNode import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream @@ -26,7 +26,7 @@ class DestinationCatalogTest { .withDestinationSyncMode(DestinationSyncMode.APPEND) .withStream( AirbyteStream() - .withJsonSchema(Jsons.deserialize("""{"type": "object"}""")) + .withJsonSchema("""{"type": "object"}""".deserializeToNode()) .withNamespace("namespace1") .withName("name1") ), @@ -37,7 +37,7 @@ class DestinationCatalogTest { .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) .withStream( AirbyteStream() - .withJsonSchema(Jsons.deserialize("""{"type": "object"}""")) + .withJsonSchema("""{"type": "object"}""".deserializeToNode()) .withNamespace("namespace2") .withName("name2") ) @@ -50,7 +50,7 @@ class DestinationCatalogTest { .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) .withStream( AirbyteStream() - .withJsonSchema(Jsons.deserialize("""{"type": "object"}""")) + .withJsonSchema("""{"type": "object"}""".deserializeToNode()) .withNamespace("namespace3") .withName("name3") ), diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt index ea720208746f..ea9f17d5e480 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueIdentityMapperTest.kt @@ -49,12 +49,7 @@ class AirbyteValueIdentityMapperTest { val (inputValues, inputSchema, _) = ValueTestBuilder>() .with(StringValue("a"), StringType) - .with( - TimestampValue("2021-01-01T12:00:00Z"), - TimeTypeWithTimezone, - nameOverride = "bad", - nullable = true - ) + .with(IntegerValue(1000), BooleanType, nameOverride = "bad", nullable = true) .build() val mapper = AirbyteValueIdentityMapper() val (values, changes) = mapper.map(inputValues, inputSchema) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt index 25767ef0c4f7..45ea0cbca2c5 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/MapperPipelineTest.kt @@ -38,7 +38,7 @@ class MapperPipelineTest { value: IntegerValue, context: Context ): Pair { - if (value.value == 2L) { + if (value.value.toLong() == 2L) { throw IllegalStateException("Arbitrarily reject 2") } return StringValue(value.value.toString()) to context diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegersTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegersTest.kt new file mode 100644 index 000000000000..97414f9f6cf4 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/NullOutOfRangeIntegersTest.kt @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.test.util.Root +import io.airbyte.cdk.load.test.util.ValueTestBuilder +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Change +import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason +import java.math.BigInteger +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class NullOutOfRangeIntegersTest { + @Test + fun testDefaultBehavior() { + val (valueIn, schemaIn, expectedValue) = + ValueTestBuilder() + .with(IntegerValue(150), IntegerType, nullable = true) + .with( + IntegerValue(BigInteger("123456789012345678901234567890")), + IntegerType, + NullValue, + nameOverride = "big_integer", + nullable = true + ) + .build() + val (actualValue, changes) = NullOutOfRangeIntegers().map(valueIn, schemaIn) + Assertions.assertEquals(expectedValue, actualValue) + Assertions.assertEquals(1, changes.size) + Assertions.assertEquals( + DestinationRecord.Change( + "big_integer", + Change.NULLED, + Reason.DESTINATION_FIELD_SIZE_LIMITATION, + ), + changes[0] + ) + } + + @Test + fun testRestrictiveBehavior() { + val minValue = BigInteger("100") + val maxValue = BigInteger("200") + val (valueIn, schemaIn, expectedValue) = + ValueTestBuilder() + .with(IntegerValue(150), IntegerType, nullable = true) + .with( + IntegerValue(10), + IntegerType, + NullValue, + nameOverride = "too_small", + nullable = true + ) + .with( + IntegerValue(300), + IntegerType, + NullValue, + nameOverride = "too_big", + nullable = true + ) + .build() + val (actualValue, changes) = + NullOutOfRangeIntegers(minValue, maxValue).map(valueIn, schemaIn) + Assertions.assertEquals(expectedValue, actualValue) + Assertions.assertEquals( + setOf( + DestinationRecord.Change( + "too_small", + Change.NULLED, + Reason.DESTINATION_FIELD_SIZE_LIMITATION, + ), + DestinationRecord.Change( + "too_big", + Change.NULLED, + Reason.DESTINATION_FIELD_SIZE_LIMITATION, + ), + changes[1] + ), + changes.toSet() + ) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt index caaf37576975..4591a6258e57 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonSchemaToAirbyteSchemaTypeTest.kt @@ -22,7 +22,7 @@ import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.deserializeToNode import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -228,8 +228,7 @@ class JsonSchemaToAirbyteSchemaTypeTest { @Test fun testHandleNonstandardFields() { val inputSchema = - Jsons.readTree( - """ + """ { "type": [ "string", @@ -238,8 +237,9 @@ class JsonSchemaToAirbyteSchemaTypeTest { "description": "foo", "some_random_other_property": "lol, lmao, isn't jsonschema great" } - """.trimIndent() - ) as ObjectNode + """ + .trimIndent() + .deserializeToNode() as ObjectNode val airbyteType = JsonSchemaToAirbyteType().convert(inputSchema) Assertions.assertEquals(UnionType.of(StringType, IntegerType), airbyteType) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt index 00a4415ff3fd..423b357b898a 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValueTest.kt @@ -28,8 +28,9 @@ import io.airbyte.cdk.load.data.TimeValue import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.deserializeToNode import java.math.BigDecimal +import java.math.BigInteger import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -56,7 +57,7 @@ class JsonToAirbyteValueTest { val value = JsonToAirbyteValue().convert(JsonNodeFactory.instance.numberNode(42), IntegerType) Assertions.assertTrue(value is IntegerValue) - Assertions.assertEquals(42, (value as IntegerValue).value) + Assertions.assertEquals(BigInteger.valueOf(42), (value as IntegerValue).value) } @Test @@ -148,7 +149,7 @@ class JsonToAirbyteValueTest { UnionType.of(StringType, IntegerType) ) Assertions.assertTrue(intValue is IntegerValue) - Assertions.assertEquals(42, (intValue as IntegerValue).value) + Assertions.assertEquals(BigInteger.valueOf(42), (intValue as IntegerValue).value) } @Test @@ -185,7 +186,7 @@ class JsonToAirbyteValueTest { val value = JsonToAirbyteValue() .convert( - Jsons.readTree("""{"foo": 1}"""), + """{"foo": 1}""".deserializeToNode(), ObjectType( properties = linkedMapOf( diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt index b75ca727ee4d..fd6fc3b15ed3 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/message/DestinationMessageTest.kt @@ -8,7 +8,9 @@ import io.airbyte.cdk.load.command.Append import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.protocol.models.Jsons +import io.airbyte.cdk.load.util.deserializeToClass +import io.airbyte.cdk.load.util.deserializeToNode +import io.airbyte.cdk.load.util.serializeToString import io.airbyte.protocol.models.v0.AirbyteGlobalState import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessage @@ -47,7 +49,7 @@ class DestinationMessageTest { factory: DestinationMessageFactory, message: AirbyteMessage ): DestinationMessage { - val serialized = Jsons.serialize(message) + val serialized = message.serializeToString() return factory.fromAirbyteMessage( // We have to set some stuff in additionalProperties, so force the protocol model back // to a serialized representation and back. @@ -56,7 +58,7 @@ class DestinationMessageTest { // whereas converting to JSON yields `{"foo": 12}`, which then deserializes back out // as `Int?`. // Fortunately, the protocol models are (by definition) round-trippable through JSON. - Jsons.deserialize(serialized, AirbyteMessage::class.java), + serialized.deserializeToClass(AirbyteMessage::class.java), serialized, ) } @@ -101,14 +103,13 @@ class DestinationMessageTest { // we represent the state message ID as a long, but jackson sees that 1234 can be Int, // and Int(1234) != Long(1234). (and additionalProperties is just a Map) // So we just compare the serialized protocol messages. - Jsons.serialize( - inputMessage.also { - it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) - } - ), - Jsons.serialize( - parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage() - ), + inputMessage + .also { it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) } + .serializeToString(), + parsedMessage + .withDestinationStats(CheckpointMessage.Stats(3)) + .asProtocolMessage() + .serializeToString() ) } @@ -139,21 +140,20 @@ class DestinationMessageTest { val parsedMessage = convert(factory(false), inputMessage) as GlobalCheckpoint Assertions.assertEquals( - Jsons.serialize( - inputMessage.also { - it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) - } - ), - Jsons.serialize( - parsedMessage.withDestinationStats(CheckpointMessage.Stats(3)).asProtocolMessage() - ), + inputMessage + .also { it.state.destinationStats = AirbyteStateStats().withRecordCount(3.0) } + .serializeToString(), + parsedMessage + .withDestinationStats(CheckpointMessage.Stats(3)) + .asProtocolMessage() + .serializeToString() ) } companion object { private val descriptor = DestinationStream.Descriptor("namespace", "name") - private val blob1 = Jsons.deserialize("""{"foo": "bar"}""") - private val blob2 = Jsons.deserialize("""{"foo": "bar"}""") + private val blob1 = """{"foo": "bar"}""".deserializeToNode() + private val blob2 = """{"foo": "bar"}""".deserializeToNode() @JvmStatic fun roundTrippableMessages(): List = diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt index cd47cf70d206..1bbcbee8dd1e 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/implementor/ProcessRecordsTaskTest.kt @@ -74,7 +74,10 @@ class ProcessRecordsTaskTest { // To demonstrate that the primed data was actually processed val (sum, count) = records.asSequence().fold(SumAndCount()) { acc, record -> - SumAndCount(acc.sum + (record.data as IntegerValue).value, acc.count + 1) + SumAndCount( + acc.sum + (record.data as IntegerValue).value.toLong(), + acc.count + 1 + ) } return MockBatch( state = Batch.State.COMPLETE, diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/spec/SpecTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/spec/SpecTest.kt index d2dd15ae9541..25bac2cc200c 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/spec/SpecTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/spec/SpecTest.kt @@ -17,7 +17,8 @@ import io.airbyte.cdk.load.test.util.FakeDataDumper import io.airbyte.cdk.load.test.util.IntegrationTest import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.Jsons +import io.airbyte.cdk.load.util.deserializeToPrettyPrintedString import io.airbyte.protocol.models.v0.AirbyteMessage import java.nio.file.Files import java.nio.file.Path @@ -73,8 +74,7 @@ abstract class SpecTest : ) val spec = specMessages.first().spec - val actualSpecPrettyPrint: String = - Jsons.writerWithDefaultPrettyPrinter().writeValueAsString(spec) + val actualSpecPrettyPrint: String = spec.deserializeToPrettyPrintedString() Files.write(expectedSpecPath, actualSpecPrettyPrint.toByteArray()) val jsonMatcher: JsonMatcher = diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt index 1a010a8dbd9b..2a20bb47fd5c 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/AirbyteValueWithMetaToOutputRecord.kt @@ -29,15 +29,17 @@ class AirbyteValueWithMetaToOutputRecord { (value.values[DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT] as IntegerValue) .value + .toLong() ), loadedAt = null, data = value.values[DestinationRecord.Meta.COLUMN_NAME_DATA] as ObjectValue, generationId = (value.values[DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID] as IntegerValue) - .value, + .value + .toLong(), airbyteMeta = OutputRecord.Meta( - syncId = (meta.values["sync_id"] as IntegerValue).value, + syncId = (meta.values["sync_id"] as IntegerValue).value.toLong(), changes = (meta.values["changes"] as ArrayValue) .values diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt index 05eda51b1341..a51ddc3e78a7 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/IntegrationTest.kt @@ -45,6 +45,8 @@ abstract class IntegrationTest( val destinationCleaner: DestinationCleaner, val recordMangler: ExpectedRecordMapper = NoopExpectedRecordMapper, val nameMapper: NameMapper = NoopNameMapper, + /** See [RecordDiffer.nullEqualsUnset]. */ + val nullEqualsUnset: Boolean = false, ) { // Intentionally don't inject the actual destination process - we need a full factory // because some tests want to run multiple syncs, so we need to run the destination @@ -95,8 +97,9 @@ abstract class IntegrationTest( canonicalExpectedRecords.map { recordMangler.mapRecord(it) } RecordDiffer( - primaryKey.map { nameMapper.mapFieldName(it) }, - cursor?.let { nameMapper.mapFieldName(it) }, + primaryKey = primaryKey.map { nameMapper.mapFieldName(it) }, + cursor = cursor?.let { nameMapper.mapFieldName(it) }, + nullEqualsUnset = nullEqualsUnset, ) .diffRecords(expectedRecords, actualRecords) ?.let { diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index c9310efe66f1..a4765f493887 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -11,6 +11,8 @@ import io.airbyte.cdk.load.data.NullValue import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.TimeValue import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.cdk.load.data.json.JsonToAirbyteValue import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -30,6 +32,15 @@ class RecordDiffer( val primaryKey: List> = emptyList(), /** The path to the cursor from a record, or null if the stream has no cursor. */ val cursor: List? = null, + /** + * Many destinations (e.g. SQL destinations with a JSON column type) can distinguish between a + * value being explicitly null, vs being unset. E.g. postgres `"null" :: jsonb` vs `null :: + * jsonb`, or plain JSONL files `{"foo": null}` vs `{}`. + * + * Set this parameter to true for destinations which do not support this distinction (e.g. Avro + * files). + */ + val nullEqualsUnset: Boolean = false, ) { private fun extract(data: Map, path: List): AirbyteValue { return when (path.size) { @@ -220,17 +231,22 @@ class RecordDiffer( val expectedPresent: Boolean = expectedRecord.data.values.containsKey(key) val actualPresent: Boolean = actualRecord.data.values.containsKey(key) if (expectedPresent && !actualPresent) { - // The expected record contained this key, but the actual record was missing - // this key. - diff.append( - "$key: Expected ${expectedRecord.data.values[key]}, but was \n" - ) + if (!nullEqualsUnset || expectedRecord.data.values[key] !is NullValue) { + // The expected record contained this key, but the actual record was missing + // this key. + diff.append( + "$key: Expected ${expectedRecord.data.values[key]}, but was \n" + ) + } } else if (!expectedPresent && actualPresent) { - // The expected record didn't contain this key, but the actual record contained - // this key. - diff.append( - "$key: Expected , but was ${actualRecord.data.values[key]}\n" - ) + if (!nullEqualsUnset || actualRecord.data.values[key] !is NullValue) { + // The expected record didn't contain this key, but the actual record + // contained + // this key. + diff.append( + "$key: Expected , but was ${actualRecord.data.values[key]}\n" + ) + } } else if (expectedPresent && actualPresent) { // The expected and actual records both contain this key. // Compare the values for equality. @@ -262,6 +278,19 @@ class RecordDiffer( ?: 0) private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int { + if (v1 is UnknownValue) { + return compare( + JsonToAirbyteValue().fromJson(v1.value), + v2, + ) + } + if (v2 is UnknownValue) { + return compare( + v1, + JsonToAirbyteValue().fromJson(v2.value), + ) + } + // when comparing values of different types, just sort by their class name. // in theory, we could check for numeric types and handle them smartly... // that's a lot of work though diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index 07ab1ea0eb4f..78431d73631c 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -5,8 +5,10 @@ package io.airbyte.cdk.load.test.util.destination_process import io.airbyte.cdk.command.FeatureFlag +import io.airbyte.cdk.load.util.deserializeToClass +import io.airbyte.cdk.load.util.serializeToJsonBytes +import io.airbyte.cdk.load.util.serializeToString import io.airbyte.cdk.output.BufferingOutputConsumer -import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.AirbyteLogMessage import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog @@ -125,7 +127,7 @@ class DockerizedDestination( cmd.add("destination_$paramName.json") } configContents?.let { addInput("config", it.toByteArray(Charsets.UTF_8)) } - catalog?.let { addInput("catalog", Jsons.writeValueAsBytes(catalog)) } + catalog?.let { addInput("catalog", catalog.serializeToJsonBytes()) } logger.info { "Executing command: ${cmd.joinToString(" ")}" } process = ProcessBuilder(cmd).start() @@ -143,7 +145,7 @@ class DockerizedDestination( val line = destinationStdout.nextLine() val message = try { - Jsons.readValue(line, AirbyteMessage::class.java) + line.deserializeToClass(AirbyteMessage::class.java) } catch (e: Exception) { // If a destination logs non-json output, just echo it getMdcScope().use { logger.info { line } } @@ -205,7 +207,7 @@ class DockerizedDestination( } override fun sendMessage(message: AirbyteMessage) { - destinationStdin.write(Jsons.writeValueAsString(message)) + destinationStdin.write(message.serializeToString()) destinationStdin.newLine() } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt index 626eb0948704..09b8bf85aaba 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/NonDockerizedDestination.kt @@ -8,7 +8,7 @@ import io.airbyte.cdk.ConnectorUncleanExitException import io.airbyte.cdk.command.CliRunnable import io.airbyte.cdk.command.CliRunner import io.airbyte.cdk.command.FeatureFlag -import io.airbyte.protocol.models.Jsons +import io.airbyte.cdk.load.util.serializeToString import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import java.io.PipedInputStream @@ -74,7 +74,7 @@ class NonDockerizedDestination( } override fun sendMessage(message: AirbyteMessage) { - destinationStdinPipe.println(Jsons.serialize(message)) + destinationStdinPipe.println(message.serializeToString()) } override fun readMessages(): List = destination.results.newMessages() diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index 9f2902de7163..9acbe9c602c7 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -47,11 +47,12 @@ import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopNameMapper import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.destination_process.DestinationUncleanExitException -import io.airbyte.cdk.util.Jsons +import io.airbyte.cdk.load.util.deserializeToNode import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage import java.math.BigDecimal +import java.math.BigInteger import java.time.LocalDate import java.time.LocalDateTime import java.time.LocalTime @@ -83,6 +84,8 @@ data class StronglyTyped( val topLevelFloatLosesPrecision: Boolean = true, /** Whether floats nested inside objects/arrays are represented as float64. */ val nestedFloatLosesPrecision: Boolean = true, + /** Whether the destination supports integers larger than int64 */ + val integerCanBeLarge: Boolean = true, ) : AllTypesBehavior data object Untyped : AllTypesBehavior @@ -124,7 +127,8 @@ abstract class BasicFunctionalityIntegrationTest( */ val commitDataIncrementally: Boolean, val allTypesBehavior: AllTypesBehavior, -) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper) { + nullEqualsUnset: Boolean = false, +) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper, nullEqualsUnset) { val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents) @Test @@ -351,13 +355,13 @@ abstract class BasicFunctionalityIntegrationTest( when (streamName) { "test_stream1" -> { assertEquals( - Jsons.readTree("""{"foo": "bar1"}"""), + """{"foo": "bar1"}""".deserializeToNode(), stateMessage.stream.streamState, ) } "test_stream2" -> { assertEquals( - Jsons.readTree("""{"foo": "bar2"}"""), + """{"foo": "bar2"}""".deserializeToNode(), stateMessage.stream.streamState ) } @@ -678,8 +682,8 @@ abstract class BasicFunctionalityIntegrationTest( configContents, stream1, listOf( - makeInputRecord(1, "2024-01-23T01:00Z", 100), - makeInputRecord(2, "2024-01-23T01:00Z", 100), + makeInputRecord(1, "2024-01-23T01:00:00Z", 100), + makeInputRecord(2, "2024-01-23T01:00:00Z", 100), ), ) dumpAndDiffRecords( @@ -687,14 +691,14 @@ abstract class BasicFunctionalityIntegrationTest( listOf( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, @@ -717,7 +721,7 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream2, - listOf(makeInputRecord(1, "2024-01-23T02:00Z", 200)), + listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), streamStatus = null, ) } @@ -726,14 +730,14 @@ abstract class BasicFunctionalityIntegrationTest( listOfNotNull( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, @@ -741,7 +745,7 @@ abstract class BasicFunctionalityIntegrationTest( if (commitDataIncrementally) { makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, @@ -761,21 +765,21 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream2, - listOf(makeInputRecord(2, "2024-01-23T03:00Z", 300)), + listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) dumpAndDiffRecords( parsedConfig, listOf( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T03:00Z", + updatedAt = "2024-01-23T03:00:00Z", extractedAt = 300, generationId = 42, syncId = 42, @@ -842,7 +846,7 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream, - listOf(makeInputRecord(1, "2024-01-23T02:00Z", 200)), + listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), streamStatus = null, ) } @@ -852,7 +856,7 @@ abstract class BasicFunctionalityIntegrationTest( listOf( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, @@ -872,21 +876,21 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream, - listOf(makeInputRecord(2, "2024-01-23T03:00Z", 300)), + listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) dumpAndDiffRecords( parsedConfig, listOf( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T03:00Z", + updatedAt = "2024-01-23T03:00:00Z", extractedAt = 300, generationId = 42, syncId = 42, @@ -958,8 +962,8 @@ abstract class BasicFunctionalityIntegrationTest( configContents, stream1, listOf( - makeInputRecord(1, "2024-01-23T01:00Z", 100), - makeInputRecord(2, "2024-01-23T01:00Z", 100), + makeInputRecord(1, "2024-01-23T01:00:00Z", 100), + makeInputRecord(2, "2024-01-23T01:00:00Z", 100), ), ) dumpAndDiffRecords( @@ -967,14 +971,14 @@ abstract class BasicFunctionalityIntegrationTest( listOf( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, @@ -997,7 +1001,7 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream2, - listOf(makeInputRecord(1, "2024-01-23T02:00Z", 200)), + listOf(makeInputRecord(1, "2024-01-23T02:00:00Z", 200)), streamStatus = null, ) } @@ -1006,14 +1010,14 @@ abstract class BasicFunctionalityIntegrationTest( listOfNotNull( makeOutputRecord( id = 1, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, @@ -1021,7 +1025,7 @@ abstract class BasicFunctionalityIntegrationTest( if (commitDataIncrementally) { makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, @@ -1047,7 +1051,7 @@ abstract class BasicFunctionalityIntegrationTest( runSync( configContents, stream3, - listOf(makeInputRecord(2, "2024-01-23T03:00Z", 300)), + listOf(makeInputRecord(2, "2024-01-23T03:00:00Z", 300)), ) dumpAndDiffRecords( parsedConfig, @@ -1055,14 +1059,14 @@ abstract class BasicFunctionalityIntegrationTest( // records from sync 1 makeOutputRecord( id = 1, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, ), makeOutputRecord( id = 2, - updatedAt = "2024-01-23T01:00Z", + updatedAt = "2024-01-23T01:00:00Z", extractedAt = 100, generationId = 41, syncId = 41, @@ -1070,7 +1074,7 @@ abstract class BasicFunctionalityIntegrationTest( // sync 2 makeOutputRecord( id = 1, - updatedAt = "2024-01-23T02:00Z", + updatedAt = "2024-01-23T02:00:00Z", extractedAt = 200, generationId = 42, syncId = 42, @@ -1078,7 +1082,7 @@ abstract class BasicFunctionalityIntegrationTest( // and sync 3 makeOutputRecord( id = 2, - updatedAt = "2024-01-23T03:00Z", + updatedAt = "2024-01-23T03:00:00Z", extractedAt = 300, generationId = 43, syncId = 43, @@ -1471,13 +1475,16 @@ abstract class BasicFunctionalityIntegrationTest( assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) } } - /** A basic test that we handle all supported data types in a reasonable way. */ + /** + * A basic test that we handle all supported basic data types in a reasonable way. See also + * [testContainerTypes] for objects/arrays. + */ // Depending on how future connector development goes - we might need to do something similar to // BaseSqlGeneratorIntegrationTest, where we split out tests for connectors that do/don't // support safe_cast. (or, we move fully to in-connector typing, and we stop worrying about // per-destination safe_cast support). @Test - open fun testAllTypes() { + open fun testBasicTypes() { assumeTrue(verifyDataWriting) val stream = DestinationStream( @@ -1486,16 +1493,12 @@ abstract class BasicFunctionalityIntegrationTest( ObjectType( linkedMapOf( "id" to intType, + // Some destinations handle numbers differently in root and nested fields "struct" to FieldType( ObjectType(linkedMapOf("foo" to numberType)), nullable = true ), - "struct_schemaless" to - FieldType(ObjectTypeWithEmptySchema, nullable = true), - "struct_empty" to FieldType(ObjectTypeWithEmptySchema, nullable = true), - "array" to FieldType(ArrayType(numberType), nullable = true), - "array_schemaless" to FieldType(ArrayTypeWithoutSchema, nullable = true), "string" to FieldType(StringType, nullable = true), "number" to FieldType(NumberType, nullable = true), "integer" to FieldType(IntegerType, nullable = true), @@ -1508,11 +1511,6 @@ abstract class BasicFunctionalityIntegrationTest( "time_without_timezone" to FieldType(TimeTypeWithoutTimezone, nullable = true), "date" to FieldType(DateType, nullable = true), - "unknown" to - FieldType( - UnknownType(JsonNodeFactory.instance.textNode("test")), - nullable = true - ), ) ), generationId = 42, @@ -1535,11 +1533,6 @@ abstract class BasicFunctionalityIntegrationTest( """ { "id": 1, - "struct": {"foo": 1.0}, - "struct_schemaless": {"foo": 1.0}, - "struct_empty": {"foo": 1.0}, - "array": [1.0], - "array_schemaless": [1.0], "string": "foo", "number": 42.1, "integer": 42, @@ -1548,8 +1541,7 @@ abstract class BasicFunctionalityIntegrationTest( "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", - "date": "2023-01-23", - "unknown": {} + "date": "2023-01-23" } """.trimIndent() ), @@ -1558,11 +1550,6 @@ abstract class BasicFunctionalityIntegrationTest( """ { "id": 2, - "struct": null, - "struct_schemaless": null, - "struct_empty": null, - "array": null, - "array_schemaless": null, "string": null, "number": null, "integer": null, @@ -1571,26 +1558,23 @@ abstract class BasicFunctionalityIntegrationTest( "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, - "date": null, - "unknown": null + "date": null } """.trimIndent() ), // A record with all fields unset makeRecord("""{"id": 3}"""), - // A record that verifies floating-point behavior. - // 67.174118 cannot be represented as a standard float64 - // (it turns into 67.17411800000001). + // A record that verifies numeric behavior. + // 99999999999999999999999999999999 is out of range for int64. + // 50000.0000000000000001 can't be represented as a standard float64, + // and gets rounded off. makeRecord( """ { "id": 4, - "struct": {"foo": 67.174118}, - "struct_schemaless": {"foo": 67.174118}, - "struct_empty": {"foo": 67.174118}, - "array": [67.174118], - "array_schemaless": [67.174118], - "number": 67.174118 + "struct": {"foo": 50000.0000000000000001}, + "number": 50000.0000000000000001, + "integer": 99999999999999999999999999999999 } """.trimIndent(), ), @@ -1599,11 +1583,6 @@ abstract class BasicFunctionalityIntegrationTest( """ { "id": 5, - "struct": "foo", - "struct_schemaless": "foo", - "struct_empty": "foo", - "array": "foo", - "array_schemaless": "foo", "string": {}, "number": "foo", "integer": "foo", @@ -1621,30 +1600,46 @@ abstract class BasicFunctionalityIntegrationTest( val nestedFloat: BigDecimal val topLevelFloat: BigDecimal + val bigInt: BigInteger? + val bigIntChanges: List val badValuesData: Map val badValuesChanges: MutableList when (allTypesBehavior) { is StronglyTyped -> { nestedFloat = if (allTypesBehavior.nestedFloatLosesPrecision) { - BigDecimal("67.17411800000001") + BigDecimal("50000.0") } else { - BigDecimal("67.174118") + BigDecimal("50000.0000000000000001") } topLevelFloat = if (allTypesBehavior.topLevelFloatLosesPrecision) { - BigDecimal("67.17411800000001") + BigDecimal("50000.0") } else { - BigDecimal("67.174118") + BigDecimal("50000.0000000000000001") + } + bigInt = + if (allTypesBehavior.integerCanBeLarge) { + BigInteger("99999999999999999999999999999999") + } else { + null + } + bigIntChanges = + if (allTypesBehavior.integerCanBeLarge) { + emptyList() + } else { + listOf( + Change( + "integer", + AirbyteRecordMessageMetaChange.Change.NULLED, + AirbyteRecordMessageMetaChange.Reason + .DESTINATION_FIELD_SIZE_LIMITATION, + ) + ) } badValuesData = mapOf( "id" to 5, - "struct" to null, - "struct_schemaless" to null, - "struct_empty" to null, - "array" to null, - "array_schemaless" to null, "string" to if (allTypesBehavior.convertAllValuesToString) { "{}" @@ -1664,6 +1659,9 @@ abstract class BasicFunctionalityIntegrationTest( (stream.schema as ObjectType) .properties .keys + // id and struct don't have a bad value case here + // (id would make the test unusable; struct is tested in testContainerTypes) + .filter { it != "id" && it != "struct" } .map { key -> Change( key, @@ -1678,16 +1676,13 @@ abstract class BasicFunctionalityIntegrationTest( .toMutableList() } Untyped -> { - nestedFloat = BigDecimal("67.174118") - topLevelFloat = BigDecimal("67.174118") + nestedFloat = BigDecimal("50000.0000000000000001") + topLevelFloat = BigDecimal("50000.0000000000000001") + bigInt = BigInteger("99999999999999999999999999999999") + bigIntChanges = emptyList() badValuesData = mapOf( "id" to 5, - "struct" to "foo", - "struct_schemaless" to "foo", - "struct_empty" to "foo", - "array" to "foo", - "array_schemaless" to "foo", "string" to StringValue("{}"), "number" to "foo", "integer" to "foo", @@ -1714,11 +1709,6 @@ abstract class BasicFunctionalityIntegrationTest( data = mapOf( "id" to 1, - "struct" to mapOf("foo" to 1.0), - "struct_schemaless" to mapOf("foo" to 1.0), - "struct_empty" to mapOf("foo" to 1.0), - "array" to listOf(1.0), - "array_schemaless" to listOf(1.0), "string" to "foo", "number" to 42.1, "integer" to 42, @@ -1730,7 +1720,6 @@ abstract class BasicFunctionalityIntegrationTest( "time_with_timezone" to OffsetTime.parse("12:34:56Z"), "time_without_timezone" to LocalTime.parse("12:34:56"), "date" to LocalDate.parse("2023-01-23"), - "unknown" to mapOf(), ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1740,11 +1729,6 @@ abstract class BasicFunctionalityIntegrationTest( data = mapOf( "id" to 2, - "struct" to null, - "struct_schemaless" to null, - "struct_empty" to null, - "array" to null, - "array_schemaless" to null, "string" to null, "number" to null, "integer" to null, @@ -1754,7 +1738,6 @@ abstract class BasicFunctionalityIntegrationTest( "time_with_timezone" to null, "time_without_timezone" to null, "date" to null, - "unknown" to null, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1771,13 +1754,10 @@ abstract class BasicFunctionalityIntegrationTest( mapOf( "id" to 4, "struct" to mapOf("foo" to nestedFloat), - "struct_schemaless" to mapOf("foo" to nestedFloat), - "struct_empty" to mapOf("foo" to nestedFloat), - "array" to listOf(nestedFloat), - "array_schemaless" to listOf(nestedFloat), "number" to topLevelFloat, + "integer" to bigInt, ), - airbyteMeta = OutputRecord.Meta(syncId = 42), + airbyteMeta = OutputRecord.Meta(syncId = 42, changes = bigIntChanges), ), OutputRecord( extractedAt = 100, @@ -1823,7 +1803,13 @@ abstract class BasicFunctionalityIntegrationTest( ), "empty_object" to FieldType(ObjectTypeWithEmptySchema, nullable = true), "schemaless_object" to FieldType(ObjectTypeWithoutSchema, nullable = true), + "schematized_array" to FieldType(ArrayType(intType), nullable = true), "schemaless_array" to FieldType(ArrayTypeWithoutSchema, nullable = true), + "unknown" to + FieldType( + UnknownType(JsonNodeFactory.instance.textNode("test")), + nullable = true + ), ), ), generationId = 42, @@ -1843,7 +1829,9 @@ abstract class BasicFunctionalityIntegrationTest( "schematized_object": { "id": 1, "name": "Joe" }, "empty_object": {}, "schemaless_object": { "uuid": "38F52396-736D-4B23-B5B4-F504D8894B97", "probability": 1.5 }, - "schemaless_array": [ 10, "foo", null, { "bar": "qua" } ] + "schematized_array": [10, null], + "schemaless_array": [ 10, "foo", null, { "bar": "qua" } ], + "unknown": {"foo": "bar"} }""".trimIndent(), emittedAtMs = 1602637589100, ), @@ -1856,7 +1844,9 @@ abstract class BasicFunctionalityIntegrationTest( "schematized_object": { "id": 2, "name": "Jane" }, "empty_object": {"extra": "stuff"}, "schemaless_object": { "address": { "street": "113 Hickey Rd", "zip": "37932" }, "flags": [ true, false, false ] }, - "schemaless_array": [] + "schematized_array": [], + "schemaless_array": [], + "unknown": {} }""".trimIndent(), emittedAtMs = 1602637589200, ), @@ -1869,7 +1859,9 @@ abstract class BasicFunctionalityIntegrationTest( "schematized_object": null, "empty_object": null, "schemaless_object": null, - "schemaless_array": null + "schematized_array": null, + "schemaless_array": null, + "unknown": null }""".trimIndent(), emittedAtMs = 1602637589300, ), @@ -1896,12 +1888,19 @@ abstract class BasicFunctionalityIntegrationTest( "probability" to 1.5 ) }, + "schematized_array" to listOf(10, null), "schemaless_array" to if (stringifySchemalessObjects) { """[10,"foo",null,{"bar":"qua"}]""" } else { listOf(10, "foo", null, mapOf("bar" to "qua")) }, + "unknown" to + if (stringifySchemalessObjects) { + """{"foo":"bar"}""" + } else { + mapOf("foo" to "bar") + }, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1931,12 +1930,19 @@ abstract class BasicFunctionalityIntegrationTest( "flags" to listOf(true, false, false) ) }, + "schematized_array" to emptyList(), "schemaless_array" to if (stringifySchemalessObjects) { "[]" } else { emptyList() }, + "unknown" to + if (stringifySchemalessObjects) { + """{}""" + } else { + emptyMap() + }, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), @@ -1949,7 +1955,9 @@ abstract class BasicFunctionalityIntegrationTest( "schematized_object" to null, "empty_object" to null, "schemaless_object" to null, + "schematized_array" to null, "schemaless_array" to null, + "unknown" to null, ), airbyteMeta = OutputRecord.Meta(syncId = 42), ), diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt index eaf1522c36ed..a9703e40369e 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.data.ArrayType import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.ObjectType @@ -36,33 +37,21 @@ class AirbyteTypeToAvroSchema { airbyteSchema.properties.entries .fold(builder) { acc, (name, field) -> val converted = convert(field.type, path + name) - acc.name(name).let { - if (field.nullable && converted.type != Schema.Type.UNION) { - it.type( - SchemaBuilder.unionOf() - .nullType() - .and() - .type(converted) - .endUnion() - ) - .withDefault(null) - } else if (field.nullable && converted.type == Schema.Type.UNION) { - converted.types - .fold(SchemaBuilder.unionOf().nullType()) { acc, type -> - acc.and().type(type) - } - .endUnion() - .let { union -> it.type(union) } - .withDefault(null) + val propertySchema = maybeMakeNullable(field, converted) + acc.name(name).type(propertySchema).let { + if (field.nullable) { + it.withDefault(null) } else { - it.type(converted).noDefault() + it.noDefault() } } } .endRecord() } is ArrayType -> { - SchemaBuilder.array().items(convert(airbyteSchema.items.type, path + "items")) + val converted = convert(airbyteSchema.items.type, path + "items") + val itemsSchema = maybeMakeNullable(airbyteSchema.items, converted) + SchemaBuilder.array().items(itemsSchema) } is ArrayTypeWithoutSchema -> throw IllegalArgumentException("Array type without schema is not supported") @@ -89,9 +78,23 @@ class AirbyteTypeToAvroSchema { LogicalTypes.timestampMicros().addToSchema(schema) } is UnionType -> Schema.createUnion(airbyteSchema.options.map { convert(it, path) }) - is UnknownType -> SchemaBuilder.builder().nullType() + is UnknownType -> throw IllegalArgumentException("Unknown type is not supported") } } + + private fun maybeMakeNullable( + airbyteSchema: FieldType, + avroSchema: Schema, + ): Schema = + if (airbyteSchema.nullable && avroSchema.type != Schema.Type.UNION) { + SchemaBuilder.unionOf().nullType().and().type(avroSchema).endUnion() + } else if (airbyteSchema.nullable && avroSchema.type == Schema.Type.UNION) { + avroSchema.types + .fold(SchemaBuilder.unionOf().nullType()) { acc, type -> acc.and().type(type) } + .endUnion() + } else { + avroSchema + } } fun ObjectType.toAvroSchema(stream: DestinationStream.Descriptor): Schema { diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt index a6e5892e1955..d4957f2adbc9 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteValueToAvroRecord.kt @@ -57,7 +57,7 @@ class AirbyteValueToAvroRecord { is BooleanValue -> return airbyteValue.value is DateValue -> throw IllegalArgumentException("String-based date types are not supported") - is IntegerValue -> return airbyteValue.value + is IntegerValue -> return airbyteValue.value.toLong() is IntValue -> return airbyteValue.value is NullValue -> return null is NumberValue -> return airbyteValue.value.toDouble() diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt index 779f7402f6d0..fc13493b5e81 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroMapperPipelineFactory.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory import io.airbyte.cdk.load.data.MergeUnions +import io.airbyte.cdk.load.data.NullOutOfRangeIntegers import io.airbyte.cdk.load.data.SchemalessTypesToJsonString import io.airbyte.cdk.load.data.SchemalessValuesToJsonString import io.airbyte.cdk.load.data.TimeStringToInteger @@ -20,6 +21,7 @@ class AvroMapperPipelineFactory : MapperPipelineFactory { stream.schema, listOf( SchemalessTypesToJsonString() to SchemalessValuesToJsonString(), + AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), MergeUnions() to AirbyteValueNoopMapper(), ), diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index cbdc5532cfcd..6556e1d40731 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -33,6 +33,8 @@ import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset import org.apache.avro.generic.GenericArray import org.apache.avro.generic.GenericRecord import org.apache.avro.util.Utf8 @@ -63,7 +65,12 @@ class AvroRecordToAirbyteValue { is BooleanType -> return BooleanValue(avroValue as Boolean) is DateType -> return DateValue( - Instant.ofEpochMilli((avroValue as Int).toLong() * 86400000).toString() + LocalDateTime.ofInstant( + Instant.ofEpochMilli((avroValue as Int).toLong() * 86400000), + ZoneOffset.UTC + ) + .toLocalDate() + .toString() ) is IntegerType -> return IntegerValue(avroValue as Long) is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) @@ -80,14 +87,30 @@ class AvroRecordToAirbyteValue { throw IllegalArgumentException("Unsupported string type: $avroValue") } ) - is TimeTypeWithoutTimezone, + is TimeTypeWithoutTimezone -> + return TimeValue( + Instant.ofEpochMilli((avroValue as Long) / 1000) + .atOffset(ZoneOffset.UTC) + .toLocalTime() + .toString() + ) is TimeTypeWithTimezone -> return TimeValue( - Instant.ofEpochMilli((avroValue as Long) / 1000).toString().substring(11) + Instant.ofEpochMilli((avroValue as Long) / 1000) + .atOffset(ZoneOffset.UTC) + .toOffsetTime() + .toString() + ) + is TimestampTypeWithoutTimezone -> + return TimestampValue( + LocalDateTime.ofInstant( + Instant.ofEpochMilli((avroValue as Long) / 1000), + ZoneOffset.UTC + ) + .toString() ) - is TimestampTypeWithoutTimezone, is TimestampTypeWithTimezone -> - return TimestampValue(Instant.ofEpochMilli(avroValue as Long).toString()) + return TimestampValue(Instant.ofEpochMilli((avroValue as Long) / 1000).toString()) is UnionType -> return tryConvertUnion(avroValue, schema) is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported") else -> throw IllegalArgumentException("Unsupported schema type: $schema") diff --git a/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt index 47d6c9113446..b2fde6a6425e 100644 --- a/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-csv/src/testFixtures/kotlin/io/airbyte/cdk/load/data/csv/CsvRowToAirbyteValue.kt @@ -39,7 +39,6 @@ import org.apache.commons.csv.CSVRecord class CsvRowToAirbyteValue { fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue { - print("converting row: $row") if (schema !is ObjectType) { throw IllegalArgumentException("Only object types are supported") } @@ -59,60 +58,64 @@ class CsvRowToAirbyteValue { if (value.isBlank()) { return NullValue } - return when (field) { - is ArrayType -> { - value - .deserializeToNode() - .elements() - .asSequence() - .map { it.toAirbyteValue(field.items.type) } - .toList() - .let(::ArrayValue) - } - is ArrayTypeWithoutSchema -> - value.deserializeToNode().toAirbyteValue(ArrayTypeWithoutSchema) - is BooleanType -> BooleanValue(value.toBoolean()) - is IntegerType -> IntegerValue(value.toLong()) - is NumberType -> NumberValue(value.toBigDecimal()) - is ObjectType -> { - val properties = linkedMapOf() - value - .deserializeToNode() - .fields() - .asSequence() - .map { entry -> - val type = - field.properties[entry.key]?.type - ?: UnknownType(value.deserializeToNode()) - entry.key to entry.value.toAirbyteValue(type) - } - .toMap(properties) - ObjectValue(properties) - } - is ObjectTypeWithEmptySchema -> - value.deserializeToNode().toAirbyteValue(ObjectTypeWithEmptySchema) - is ObjectTypeWithoutSchema -> - value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema) - is StringType -> StringValue(value) - is UnionType -> { - // Use the options sorted with string last since it always works - field.options - .sortedBy { it is StringType } - .firstNotNullOfOrNull { option -> - try { - convertInner(value, option) - } catch (e: Exception) { - null + return try { + when (field) { + is ArrayType -> { + value + .deserializeToNode() + .elements() + .asSequence() + .map { it.toAirbyteValue(field.items.type) } + .toList() + .let(::ArrayValue) + } + is ArrayTypeWithoutSchema -> + value.deserializeToNode().toAirbyteValue(ArrayTypeWithoutSchema) + is BooleanType -> BooleanValue(value.toBooleanStrict()) + is IntegerType -> IntegerValue(value.toBigInteger()) + is NumberType -> NumberValue(value.toBigDecimal()) + is ObjectType -> { + val properties = linkedMapOf() + value + .deserializeToNode() + .fields() + .asSequence() + .map { entry -> + val type = + field.properties[entry.key]?.type + ?: UnknownType(value.deserializeToNode()) + entry.key to entry.value.toAirbyteValue(type) + } + .toMap(properties) + ObjectValue(properties) + } + is ObjectTypeWithEmptySchema -> + value.deserializeToNode().toAirbyteValue(ObjectTypeWithEmptySchema) + is ObjectTypeWithoutSchema -> + value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema) + is StringType -> StringValue(value) + is UnionType -> { + // Use the options sorted with string last since it always works + field.options + .sortedBy { it is StringType } + .firstNotNullOfOrNull { option -> + try { + convertInner(value, option) + } catch (e: Exception) { + null + } } - } - ?: NullValue + ?: NullValue + } + DateType -> DateValue(value) + TimeTypeWithTimezone, + TimeTypeWithoutTimezone -> TimeValue(value) + TimestampTypeWithTimezone, + TimestampTypeWithoutTimezone -> TimestampValue(value) + is UnknownType -> UnknownValue(value.deserializeToNode()) } - DateType -> DateValue(value) - TimeTypeWithTimezone, - TimeTypeWithoutTimezone -> TimeValue(value) - TimestampTypeWithTimezone, - TimestampTypeWithoutTimezone -> TimestampValue(value) - is UnknownType -> UnknownValue(value.deserializeToNode()) + } catch (e: Exception) { + StringValue(value) } } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index aeffbb18a949..08da74630952 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -21,8 +21,8 @@ import java.time.format.DateTimeFormatter import java.util.* interface PathFactory { - fun getStagingDirectory(stream: DestinationStream): Path - fun getFinalDirectory(stream: DestinationStream): Path + fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path + fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean = false): Path fun getPathToFile( stream: DestinationStream, partNumber: Long?, @@ -193,6 +193,7 @@ class ObjectStoragePathFactory( PathVariable("EPOCH", """\d+""") { it.time.toEpochMilli().toString() }, PathVariable("UUID", """[a-fA-F0-9\\-]{36}""") { UUID.randomUUID().toString() } ) + val PATH_VARIABLES_STREAM_CONSTANT = PATH_VARIABLES.filter { it.variable != "UUID" } val FILENAME_VARIABLES = listOf( FileVariable("date", """\d{4}_\d{2}_\d{2}""") { DATE_FORMATTER.format(it.time) }, @@ -218,13 +219,21 @@ class ObjectStoragePathFactory( } } - override fun getStagingDirectory(stream: DestinationStream): Path { - val path = getFormattedPath(stream) + override fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean): Path { + val path = + getFormattedPath( + stream, + if (streamConstant) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES + ) return Paths.get(stagingPrefix, path) } - override fun getFinalDirectory(stream: DestinationStream): Path { - val path = getFormattedPath(stream) + override fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean): Path { + val path = + getFormattedPath( + stream, + if (streamConstant) PATH_VARIABLES_STREAM_CONSTANT else PATH_VARIABLES + ) return Paths.get(prefix, path) } @@ -247,10 +256,13 @@ class ObjectStoragePathFactory( return path.resolve(fileName) } - private fun getFormattedPath(stream: DestinationStream): String { + private fun getFormattedPath( + stream: DestinationStream, + variables: List = PATH_VARIABLES + ): String { val pattern = pathPatternResolved val context = VariableContext(stream) - return PATH_VARIABLES.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) } + return variables.fold(pattern) { acc, variable -> variable.maybeApply(acc, context) } } private fun getFormattedFileName(context: VariableContext): String { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 8f77be8349e5..de5509a8570d 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -13,8 +13,8 @@ import io.airbyte.cdk.load.file.object_storage.PathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.state.DestinationState import io.airbyte.cdk.load.state.DestinationStatePersister +import io.airbyte.cdk.load.util.readIntoClass import io.airbyte.cdk.load.util.serializeToJsonBytes -import io.airbyte.cdk.util.Jsons import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Secondary @@ -120,9 +120,7 @@ class ObjectStorageStagingPersister( try { log.info { "Loading destination state from $key" } return client.get(key) { inputStream -> - Jsons.readTree(inputStream).let { - Jsons.treeToValue(it, ObjectStorageDestinationState::class.java) - } + inputStream.readIntoClass(ObjectStorageDestinationState::class.java) } } catch (e: Exception) { log.info { "No destination state found at $key: $e" } @@ -141,10 +139,17 @@ class ObjectStorageFallbackPersister( private val pathFactory: PathFactory ) : DestinationStatePersister { override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState { - val prefix = pathFactory.prefix val matcher = pathFactory.getPathMatcher(stream) + val pathConstant = pathFactory.getFinalDirectory(stream, streamConstant = true).toString() + val firstVariableIndex = pathConstant.indexOfFirst { it == '$' } + val longestUnambiguous = + if (firstVariableIndex > 0) { + pathConstant.substring(0, firstVariableIndex) + } else { + pathConstant + } client - .list(prefix) + .list(longestUnambiguous) .mapNotNull { matcher.match(it.key) } .toList() .groupBy { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 991283ad420d..1d0afd664836 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -81,7 +81,6 @@ class ObjectStorageStreamLoader, U : OutputStream>( val state = destinationStateManager.getState(stream) val maxPartNumber = state.generations - .filter { it.generationId >= stream.minimumGenerationId } .mapNotNull { it.objects.maxOfOrNull { obj -> obj.partNumber } } .maxOrNull() log.info { "Got max part number from destination state: $maxPartNumber" } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt index ae7f682ee038..532d56654c68 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt @@ -155,7 +155,8 @@ class ObjectStorageDestinationStateTest { @Test fun testRecoveringFromMetadata(d: Dependencies) = runTest { val genIdKey = ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY - val prefix = d.pathFactory.prefix + val prefix = + "${d.pathFactory.prefix}/${stream1.descriptor.namespace}/${stream1.descriptor.name}/" val generations = listOf( Triple(0, "$prefix/key1-0", 0L), diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt index 52f3d8e88052..ed38e692eacc 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt @@ -25,11 +25,11 @@ open class MockPathFactory : PathFactory { return "/${stream.descriptor.namespace}/${stream.descriptor.name}" } - override fun getStagingDirectory(stream: DestinationStream): Path { + override fun getStagingDirectory(stream: DestinationStream, streamConstant: Boolean): Path { return Path.of("$prefix/staging/${fromStream(stream)}") } - override fun getFinalDirectory(stream: DestinationStream): Path { + override fun getFinalDirectory(stream: DestinationStream, streamConstant: Boolean): Path { return Path.of("$prefix/${fromStream(stream)}") } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt index 393454d22bdb..c4f6ccbe0e29 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/data/parquet/ParquetMapperPipelineFactory.kt @@ -10,6 +10,7 @@ import io.airbyte.cdk.load.data.AirbyteValueNoopMapper import io.airbyte.cdk.load.data.MapperPipeline import io.airbyte.cdk.load.data.MapperPipelineFactory import io.airbyte.cdk.load.data.MergeUnions +import io.airbyte.cdk.load.data.NullOutOfRangeIntegers import io.airbyte.cdk.load.data.SchemalessTypesToJsonString import io.airbyte.cdk.load.data.SchemalessValuesToJsonString import io.airbyte.cdk.load.data.TimeStringToInteger @@ -22,6 +23,7 @@ class ParquetMapperPipelineFactory : MapperPipelineFactory { stream.schema, listOf( SchemalessTypesToJsonString() to SchemalessValuesToJsonString(), + AirbyteSchemaNoopMapper() to NullOutOfRangeIntegers(), AirbyteSchemaNoopMapper() to TimeStringToInteger(), MergeUnions() to AirbyteValueNoopMapper(), UnionTypeToDisjointRecord() to UnionValueToDisjointRecord(), diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt index a64fd51e137d..d5e5bdc881ae 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/main/kotlin/io/airbyte/cdk/load/file/parquet/ParquetWriter.kt @@ -10,6 +10,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE import org.apache.parquet.hadoop.ParquetWriter as ApacheParquetWriter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.OutputFile @@ -52,7 +53,8 @@ fun OutputStream.toParquetWriter( val writer = AvroParquetWriter.builder(outputFile) .withSchema(avroSchema) - .withConf(Configuration()) + // needed so that we can have arrays containing null elements + .withConf(Configuration().apply { setBoolean(WRITE_OLD_LIST_STRUCTURE, false) }) .withCompressionCodec(config.compressionCodec) .withRowGroupSize(config.blockSizeMb * 1024 * 1024L) .withPageSize(config.pageSizeKb * 1024) diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index feb39d328004..bdcf01acb8c6 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.2.2 + dockerImageTag: 0.2.3 dockerRepository: airbyte/destination-s3-v2 githubIssueLabel: destination-s3-v2 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt index 5c61b1978f51..3ddeb52f69f4 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2CsvAssumeRoleDestinationAcceptanceTest.kt @@ -5,7 +5,9 @@ package io.airbyte.integrations.destination.s3 import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.integrations.destination.s3.S3BaseCsvDestinationAcceptanceTest +import org.junit.jupiter.api.Disabled +@Disabled class S3V2CsvAssumeRoleDestinationAcceptanceTest : S3BaseCsvDestinationAcceptanceTest() { override val imageName: String = "airbyte/destination-s3-v2:dev" override val baseConfigJson: JsonNode diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 4cc8e9b28dd4..66c9baab1851 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -19,8 +19,9 @@ abstract class S3V2WriteTest( promoteUnionToObject: Boolean, preserveUndeclaredFields: Boolean, /** This is false for staging mode, and true for non-staging mode. */ - commitDataIncrementally: Boolean = false, + commitDataIncrementally: Boolean = true, allTypesBehavior: AllTypesBehavior, + nullEqualsUnset: Boolean = false, ) : BasicFunctionalityIntegrationTest( S3V2TestUtils.getConfig(path), @@ -35,6 +36,7 @@ abstract class S3V2WriteTest( preserveUndeclaredFields = preserveUndeclaredFields, commitDataIncrementally = commitDataIncrementally, allTypesBehavior = allTypesBehavior, + nullEqualsUnset = nullEqualsUnset, ) { @Test override fun testBasicWrite() { @@ -46,7 +48,7 @@ abstract class S3V2WriteTest( super.testFunkyCharacters() } - @Disabled + @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/10413?") @Test override fun testMidSyncCheckpointingStreamState() { super.testMidSyncCheckpointingStreamState() @@ -57,7 +59,7 @@ abstract class S3V2WriteTest( super.testAppend() } - @Disabled("append mode doesn't yet work") + @Disabled("Irrelevant for file destinations") @Test override fun testAppendSchemaEvolution() { super.testAppendSchemaEvolution() @@ -78,19 +80,15 @@ abstract class S3V2WriteTest( super.testUnions() } - @Disabled("connector doesn't yet do refreshes correctly - data from failed sync is lost") @Test override fun testInterruptedTruncateWithPriorData() { super.testInterruptedTruncateWithPriorData() } - - @Disabled("connector doesn't yet do refreshes correctly - failed sync deletes old data") @Test override fun resumeAfterCancelledTruncate() { super.resumeAfterCancelledTruncate() } - @Disabled("connector doesn't yet do refreshes correctly - failed sync deletes old data") @Test override fun testInterruptedTruncateWithoutPriorData() { super.testInterruptedTruncateWithoutPriorData() @@ -149,13 +147,9 @@ class S3V2WriteTestCsvRootLevelFlattening : promoteUnionToObject = false, preserveUndeclaredFields = false, allTypesBehavior = Untyped, - ) { - @Disabled("Does not work yet") - @Test - override fun testAllTypes() { - super.testAllTypes() - } -} + nullEqualsUnset = + true, // Technically true of unflattened as well, but no top-level fields are nullable + ) class S3V2WriteTestCsvGzip : S3V2WriteTest( @@ -172,14 +166,9 @@ class S3V2WriteTestAvroUncompressed : stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, - allTypesBehavior = StronglyTyped(), - ) { - @Disabled("Test does not support `stringifyShemalessObjects == true`") - @Test - override fun testAllTypes() { - super.testAllTypes() - } -} + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullEqualsUnset = true, + ) class S3V2WriteTestAvroBzip2 : S3V2WriteTest( @@ -187,14 +176,9 @@ class S3V2WriteTestAvroBzip2 : stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, - allTypesBehavior = StronglyTyped(), - ) { - @Disabled("Test does not support `stringifyShemalessObjects == true`") - @Test - override fun testAllTypes() { - super.testAllTypes() - } -} + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullEqualsUnset = true, + ) class S3V2WriteTestParquetUncompressed : S3V2WriteTest( @@ -202,14 +186,9 @@ class S3V2WriteTestParquetUncompressed : stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, - allTypesBehavior = StronglyTyped(), - ) { - @Disabled("Test does not support `stringifyShemalessObjects == true`") - @Test - override fun testAllTypes() { - super.testAllTypes() - } -} + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullEqualsUnset = true, + ) class S3V2WriteTestParquetSnappy : S3V2WriteTest( @@ -217,11 +196,6 @@ class S3V2WriteTestParquetSnappy : stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, - allTypesBehavior = StronglyTyped(), - ) { - @Disabled("Test does not support `stringifyShemalessObjects == true`") - @Test - override fun testAllTypes() { - super.testAllTypes() - } -} + allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullEqualsUnset = true, + )