From 022b316f831e82ce910076fa3f9f1ced59338ded Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 2 Jan 2025 16:28:18 -0800 Subject: [PATCH] [Load CDK] test improvements (#50406) --- .../BasicFunctionalityIntegrationTest.kt | 125 +++++++++++++++--- .../data/avro/AvroExpectedRecordMapper.kt | 69 ++++++++++ .../data/avro/AvroRecordToAirbyteValue.kt | 125 +++--------------- .../airbyte/cdk/load/file/avro/AvroReader.kt | 10 +- .../cdk/load/ObjectStorageDataDumper.kt | 25 +--- .../cdk/load/file/parquet/ParquetReader.kt | 7 +- .../iceberg/v2/IcebergV2WriteTest.kt | 7 +- .../connectors/destination-s3-v2/build.gradle | 3 + .../destination/s3_v2/S3V2WriteTest.kt | 29 ++-- 9 files changed, 225 insertions(+), 175 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index dddfd73562d3..1625286f47be 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -68,7 +68,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.junit.jupiter.api.assertThrows sealed interface AllTypesBehavior @@ -128,7 +127,7 @@ abstract class BasicFunctionalityIntegrationTest( */ val commitDataIncrementally: Boolean, val allTypesBehavior: AllTypesBehavior, - val failOnUnknownTypes: Boolean = false, + val nullUnknownTypes: Boolean = false, nullEqualsUnset: Boolean = false, configUpdater: ConfigurationUpdater = FakeConfigurationUpdater, ) : @@ -1954,13 +1953,15 @@ abstract class BasicFunctionalityIntegrationTest( @Test open fun testUnknownTypes() { + assumeTrue(verifyDataWriting) val stream = DestinationStream( DestinationStream.Descriptor(randomizedNamespace, "problematic_types"), Append, ObjectType( linkedMapOf( - "id" to + "id" to intType, + "name" to FieldType( UnknownType( JsonNodeFactory.instance.objectNode().put("type", "whatever") @@ -1982,7 +1983,8 @@ abstract class BasicFunctionalityIntegrationTest( "problematic_types", """ { - "id": "ex falso quodlibet" + "id": 1, + "name": "ex falso quodlibet" }""".trimIndent(), emittedAtMs = 1602637589100, ) @@ -1996,28 +1998,41 @@ abstract class BasicFunctionalityIntegrationTest( generationId = 42, data = mapOf( - "id" to "ex falso quodlibet", + "id" to 1, + "name" to + if (nullUnknownTypes) { + null + } else { + "ex falso quodlibet" + }, + ), + airbyteMeta = + OutputRecord.Meta( + syncId = 42, + changes = + if (nullUnknownTypes) { + listOf( + Change( + "name", + AirbyteRecordMessageMetaChange.Change.NULLED, + AirbyteRecordMessageMetaChange.Reason + .DESTINATION_SERIALIZATION_ERROR + ) + ) + } else { + emptyList() + } ), - airbyteMeta = OutputRecord.Meta(syncId = 42), ), ) - val dumpBlock = { - dumpAndDiffRecords( - parsedConfig, - expectedRecords, - stream, - primaryKey = listOf(listOf("id")), - cursor = null, - ) - } - if (failOnUnknownTypes) { - // Note: this will not catch assertion errors against data - // if the destination actually succeeds (by design). - assertThrows { dumpBlock() } - } else { - dumpBlock() - } + dumpAndDiffRecords( + parsedConfig, + expectedRecords, + stream, + primaryKey = listOf(listOf("id")), + cursor = null, + ) } /** @@ -2043,6 +2058,15 @@ abstract class BasicFunctionalityIntegrationTest( // Our AirbyteType treats them identically, so we don't need two test cases. "combined_type" to FieldType(UnionType.of(StringType, IntegerType), nullable = true), + // For destinations which promote unions to objects, + // and also stringify schemaless values, + // we should verify that the promoted schemaless value + // is still labelled as "object" rather than "string". + "union_of_string_and_schemaless_type" to + FieldType( + UnionType.of(ObjectTypeWithoutSchema, IntegerType), + nullable = true, + ), "union_of_objects_with_properties_identical" to FieldType( UnionType.of( @@ -2132,6 +2156,7 @@ abstract class BasicFunctionalityIntegrationTest( { "id": 1, "combined_type": "string1", + "union_of_string_and_schemaless_type": {"foo": "bar"}, "union_of_objects_with_properties_identical": { "id": 10, "name": "Joe" }, "union_of_objects_with_properties_overlapping": { "id": 20, "name": "Jane", "flagged": true }, "union_of_objects_with_properties_contradicting": { "id": 1, "name": "Jenny" }, @@ -2188,6 +2213,15 @@ abstract class BasicFunctionalityIntegrationTest( mapOf( "id" to 1, "combined_type" to maybePromote("string", "string1"), + "union_of_string_and_schemaless_type" to + maybePromote( + "object", + if (stringifySchemalessObjects) { + """{"foo":"bar"}""" + } else { + mapOf("foo" to "bar") + } + ), "union_of_objects_with_properties_identical" to mapOf("id" to 10, "name" to "Joe"), "union_of_objects_with_properties_overlapping" to @@ -2250,6 +2284,53 @@ abstract class BasicFunctionalityIntegrationTest( ) } + /** + * Verify that we can handle a stream with 0 columns. This is... not particularly useful, but + * happens sometimes. + */ + open fun testNoColumns() { + val stream = + DestinationStream( + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + Append, + ObjectType(linkedMapOf()), + generationId = 42, + minimumGenerationId = 0, + syncId = 42, + ) + runSync( + configContents, + stream, + listOf( + InputRecord( + randomizedNamespace, + "test_stream", + """{"foo": "bar"}""", + emittedAtMs = 1000L, + ) + ) + ) + dumpAndDiffRecords( + parsedConfig, + listOf( + OutputRecord( + extractedAt = 1000L, + generationId = 42, + data = + if (preserveUndeclaredFields) { + mapOf("foo" to "bar") + } else { + emptyMap() + }, + airbyteMeta = OutputRecord.Meta(syncId = 42), + ) + ), + stream, + primaryKey = listOf(), + cursor = null, + ) + } + companion object { private val intType = FieldType(IntegerType, nullable = true) private val numberType = FieldType(NumberType, nullable = true) diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt new file mode 100644 index 000000000000..bc771a739598 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroExpectedRecordMapper.kt @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data.avro + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.TimeWithTimezoneValue +import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithTimezoneValue +import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue +import io.airbyte.cdk.load.test.util.ExpectedRecordMapper +import io.airbyte.cdk.load.test.util.OutputRecord +import java.time.LocalDate +import java.time.ZoneOffset +import java.time.temporal.ChronoField +import java.time.temporal.TemporalAccessor + +object AvroExpectedRecordMapper : ExpectedRecordMapper { + override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord { + return expectedRecord.copy(data = timestampsToInteger(expectedRecord.data) as ObjectValue) + } + + /** + * Avro doesn't have true temporal types. Instead, we write dates as epoch days, and other + * temporal types as epochMicros. Therefore, in expected records, we should convert from real + * temporal types to IntegerValue. + */ + private fun timestampsToInteger(value: AirbyteValue): AirbyteValue = + when (value) { + is DateValue -> IntegerValue(value.value.toEpochDay()) + is TimestampWithTimezoneValue -> { + val micros = getMicros(value.value) + val epochSecond = value.value.toEpochSecond() + integerValue(epochSecond, micros) + } + is TimestampWithoutTimezoneValue -> { + val micros = getMicros(value.value) + val epochSecond = value.value.toEpochSecond(ZoneOffset.UTC) + integerValue(epochSecond, micros) + } + is TimeWithTimezoneValue -> { + val micros = getMicros(value.value) + val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH) + integerValue(epochSecond, micros) + } + is TimeWithoutTimezoneValue -> { + val micros = getMicros(value.value) + val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH, ZoneOffset.UTC) + integerValue(epochSecond, micros) + } + is ArrayValue -> ArrayValue(value.values.map { timestampsToInteger(it) }) + is ObjectValue -> + ObjectValue( + value.values.mapValuesTo(linkedMapOf()) { (_, v) -> timestampsToInteger(v) } + ) + else -> value + } + + private fun getMicros(value: TemporalAccessor) = value.getLong(ChronoField.MICRO_OF_SECOND) + + private fun integerValue(epochSecond: Long, micros: Long) = + IntegerValue(epochSecond * 1_000_000 + micros) +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index 1c2136c13286..dc3b6f701cb4 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -4,128 +4,43 @@ package io.airbyte.cdk.load.data.avro -import io.airbyte.cdk.load.data.AirbyteType import io.airbyte.cdk.load.data.AirbyteValue -import io.airbyte.cdk.load.data.ArrayType -import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.ArrayValue -import io.airbyte.cdk.load.data.BooleanType import io.airbyte.cdk.load.data.BooleanValue -import io.airbyte.cdk.load.data.DateType -import io.airbyte.cdk.load.data.DateValue -import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue -import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.NumberValue -import io.airbyte.cdk.load.data.ObjectType -import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema -import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectValue -import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeTypeWithTimezone -import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimeWithTimezoneValue -import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue -import io.airbyte.cdk.load.data.TimestampTypeWithTimezone -import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone -import io.airbyte.cdk.load.data.TimestampWithTimezoneValue -import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue -import io.airbyte.cdk.load.data.UnionType -import io.airbyte.cdk.load.data.UnknownType -import java.time.Instant -import java.time.LocalDateTime -import java.time.ZoneOffset import org.apache.avro.generic.GenericArray import org.apache.avro.generic.GenericRecord import org.apache.avro.util.Utf8 -class AvroRecordToAirbyteValue { - fun convert(avroValue: Any?, schema: AirbyteType, top: Boolean = false): AirbyteValue { - if (avroValue == null) { - return NullValue - } - when (schema) { - is ObjectType -> { - val properties = LinkedHashMap() - schema.properties.forEach { (name, field) -> - val value = (avroValue as GenericRecord).get(name) - if ((value != null) || top) { - properties[name] = convert(value, field.type) - } - } - return ObjectValue(properties) - } - is ArrayType -> { - val items = schema.items - val values = (avroValue as GenericArray<*>).map { convert(it, items.type) } - return ArrayValue(values) - } - is BooleanType -> return BooleanValue(avroValue as Boolean) - is DateType -> - return DateValue( - LocalDateTime.ofInstant( - Instant.ofEpochMilli((avroValue as Int).toLong() * 86400000), - ZoneOffset.UTC - ) - .toLocalDate() - ) - is IntegerType -> return IntegerValue(avroValue as Long) - is NumberType -> return NumberValue((avroValue as Double).toBigDecimal()) - is UnknownType -> return NullValue - ArrayTypeWithoutSchema, - ObjectTypeWithEmptySchema, - ObjectTypeWithoutSchema, - is StringType -> - return StringValue( - when (avroValue) { - is Utf8 -> avroValue.toString() // Avro - is String -> avroValue // Avro via Parquet - else -> - throw IllegalArgumentException("Unsupported string type: $avroValue") +object AvroRecordToAirbyteValue { + fun convert(avroValue: Any?): AirbyteValue { + return when (avroValue) { + null -> NullValue + is GenericRecord -> + ObjectValue( + avroValue.schema.fields.associateTo(linkedMapOf()) { field -> + field.name() to convert(avroValue.get(field.name())) } ) - is TimeTypeWithoutTimezone -> - return TimeWithoutTimezoneValue( - Instant.ofEpochMilli((avroValue as Long) / 1000) - .atOffset(ZoneOffset.UTC) - .toLocalTime() - ) - is TimeTypeWithTimezone -> - return TimeWithTimezoneValue( - Instant.ofEpochMilli((avroValue as Long) / 1000) - .atOffset(ZoneOffset.UTC) - .toOffsetTime() - ) - is TimestampTypeWithoutTimezone -> - return TimestampWithoutTimezoneValue( - LocalDateTime.ofInstant( - Instant.ofEpochMilli((avroValue as Long) / 1000), - ZoneOffset.UTC - ) + is GenericArray<*> -> ArrayValue(avroValue.map { convert(it) }) + is Boolean -> BooleanValue(avroValue) + is Int -> IntegerValue(avroValue.toLong()) + is Long -> IntegerValue(avroValue) + is Double -> NumberValue(avroValue.toBigDecimal()) + is Utf8 -> StringValue(avroValue.toString()) + is String -> StringValue(avroValue) + else -> + throw IllegalArgumentException( + "Unrecognized avro value type: ${avroValue::class.qualifiedName} with value: $avroValue" ) - is TimestampTypeWithTimezone -> - return TimestampWithTimezoneValue( - Instant.ofEpochMilli((avroValue as Long) / 1000).atOffset(ZoneOffset.UTC) - ) - is UnionType -> return tryConvertUnion(avroValue, schema) - else -> throw IllegalArgumentException("Unsupported schema type: $schema") - } - } - - private fun tryConvertUnion(avroValue: Any?, schema: UnionType): AirbyteValue { - for (type in schema.options) { - try { - return convert(avroValue, type) - } catch (e: Exception) { - continue - } } - throw IllegalArgumentException("Could not convert value to any of the union types") } } -fun GenericRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue { - return AvroRecordToAirbyteValue().convert(this, schema, true) +fun GenericRecord.toAirbyteValue(): AirbyteValue { + return AvroRecordToAirbyteValue.convert(this) } diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt index 22f335b0abb3..8af3e8868093 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/testFixtures/kotlin/io/airbyte/cdk/load/file/avro/AvroReader.kt @@ -4,10 +4,10 @@ package io.airbyte.cdk.load.file.avro +import io.airbyte.cdk.load.command.DestinationStream import java.io.Closeable import java.io.InputStream import kotlin.io.path.outputStream -import org.apache.avro.Schema import org.apache.avro.file.DataFileReader import org.apache.avro.generic.GenericDatumReader import org.apache.avro.generic.GenericRecord @@ -34,12 +34,12 @@ class AvroReader( } } -fun InputStream.toAvroReader(avroSchema: Schema): AvroReader { - val reader = GenericDatumReader(avroSchema) +fun InputStream.toAvroReader(streamDescriptor: DestinationStream.Descriptor): AvroReader { + val reader = GenericDatumReader() val tmpFile = kotlin.io.path.createTempFile( - prefix = "${avroSchema.namespace}.${avroSchema.name}", - suffix = ".avro" + prefix = "${streamDescriptor.namespace}.${streamDescriptor.name}", + suffix = ".avro", ) tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } val file = tmpFile.toFile() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 1a4b25070332..fdbb0062d279 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -11,12 +11,9 @@ import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration import io.airbyte.cdk.load.command.object_storage.ParquetFormatConfiguration -import io.airbyte.cdk.load.data.avro.AvroMapperPipelineFactory import io.airbyte.cdk.load.data.avro.toAirbyteValue -import io.airbyte.cdk.load.data.avro.toAvroSchema import io.airbyte.cdk.load.data.csv.toAirbyteValue import io.airbyte.cdk.load.data.json.toAirbyteValue -import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory import io.airbyte.cdk.load.data.withAirbyteMeta import io.airbyte.cdk.load.file.GZIPProcessor import io.airbyte.cdk.load.file.NoopProcessor @@ -130,32 +127,18 @@ class ObjectStorageDataDumper( } } is AvroFormatConfiguration -> { - val avroMapperPipeline = AvroMapperPipelineFactory().create(stream) - val finalSchema = avroMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) - inputStream.toAvroReader(finalSchema.toAvroSchema(stream.descriptor)).use { reader - -> + inputStream.toAvroReader(stream.descriptor).use { reader -> reader .recordSequence() - .map { - it.toAirbyteValue(finalSchema) - .maybeUnflatten(wasFlattened) - .toOutputRecord() - } + .map { it.toAirbyteValue().maybeUnflatten(wasFlattened).toOutputRecord() } .toList() } } is ParquetFormatConfiguration -> { - val parquetMapperPipeline = ParquetMapperPipelineFactory().create(stream) - val finalSchema = parquetMapperPipeline.finalSchema.withAirbyteMeta(wasFlattened) - inputStream.toParquetReader(finalSchema.toAvroSchema(stream.descriptor)).use { - reader -> + inputStream.toParquetReader(stream.descriptor).use { reader -> reader .recordSequence() - .map { - it.toAirbyteValue(finalSchema) - .maybeUnflatten(wasFlattened) - .toOutputRecord() - } + .map { it.toAirbyteValue().maybeUnflatten(wasFlattened).toOutputRecord() } .toList() } } diff --git a/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt b/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt index 384a2753bad2..f2f410967406 100644 --- a/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt +++ b/airbyte-cdk/bulk/toolkits/load-parquet/src/testFixtures/kotlin/io/airbyte/cdk/load/file/parquet/ParquetReader.kt @@ -4,11 +4,11 @@ package io.airbyte.cdk.load.file.parquet +import io.airbyte.cdk.load.command.DestinationStream import java.io.Closeable import java.io.File import java.io.InputStream import kotlin.io.path.outputStream -import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader @@ -31,11 +31,10 @@ class ParquetReader( } } -fun InputStream.toParquetReader(avroSchema: Schema): ParquetReader { - +fun InputStream.toParquetReader(descriptor: DestinationStream.Descriptor): ParquetReader { val tmpFile = kotlin.io.path.createTempFile( - prefix = "${avroSchema.namespace}.${avroSchema.name}", + prefix = "${descriptor.namespace}.${descriptor.name}", suffix = ".avro" ) tmpFile.outputStream().use { outputStream -> this.copyTo(outputStream) } diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index f2532207c27c..fed78ffa76dd 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -39,6 +39,7 @@ abstract class IcebergV2WriteTest( supportFileTransfer = false, envVars = envVars, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), + nullUnknownTypes = true, nullEqualsUnset = true, ) { @Test @@ -80,12 +81,6 @@ abstract class IcebergV2WriteTest( override fun testDedupChangeCursor() { super.testDedupChangeCursor() } - - @Test - @Disabled("https://github.com/airbytehq/airbyte-internal-issues/issues/11221") - override fun testUnknownTypes() { - super.testUnknownTypes() - } } class IcebergGlueWriteTest : diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index d370a2d14c0c..3f60f40462ff 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -37,6 +37,9 @@ dependencies { // temporary dependencies so that we can continue running the legacy test suite. // eventually we should remove those tests + rely solely on the bulk CDK tests. integrationTestLegacyImplementation testFixtures(project(":airbyte-cdk:java:airbyte-cdk:airbyte-cdk-s3-destinations")) + + // TODO this should come from from the cdk plugin + respect the cdk version + integrationTestImplementation testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-avro")) } // Exclude conflicting log4j-over-slf4j dependency diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 47fa340b0fe8..05d5480ac016 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -4,9 +4,9 @@ package io.airbyte.integrations.destination.s3_v2 +import io.airbyte.cdk.load.data.avro.AvroExpectedRecordMapper import io.airbyte.cdk.load.test.util.ExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopDestinationCleaner -import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.UncoercedExpectedRecordMapper import io.airbyte.cdk.load.write.AllTypesBehavior import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest @@ -28,7 +28,7 @@ abstract class S3V2WriteTest( commitDataIncrementally: Boolean = true, allTypesBehavior: AllTypesBehavior, nullEqualsUnset: Boolean = false, - failOnUnknownTypes: Boolean = false, + nullUnknownTypes: Boolean = false, envVars: Map = emptyMap(), ) : BasicFunctionalityIntegrationTest( @@ -47,7 +47,7 @@ abstract class S3V2WriteTest( nullEqualsUnset = nullEqualsUnset, supportFileTransfer = true, envVars = envVars, - failOnUnknownTypes = failOnUnknownTypes, + nullUnknownTypes = nullUnknownTypes, ) { @Disabled("Irrelevant for file destinations") @Test @@ -148,49 +148,54 @@ class S3V2WriteTestCsvGzip : class S3V2WriteTestAvroUncompressed : S3V2WriteTest( S3V2TestUtils.AVRO_UNCOMPRESSED_CONFIG_PATH, - NoopExpectedRecordMapper, + AvroExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, - failOnUnknownTypes = true, - ) + nullUnknownTypes = true, + ) { + @Test + override fun testUnknownTypes() { + super.testUnknownTypes() + } +} class S3V2WriteTestAvroBzip2 : S3V2WriteTest( S3V2TestUtils.AVRO_BZIP2_CONFIG_PATH, - NoopExpectedRecordMapper, + AvroExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, - failOnUnknownTypes = true, + nullUnknownTypes = true, ) class S3V2WriteTestParquetUncompressed : S3V2WriteTest( S3V2TestUtils.PARQUET_UNCOMPRESSED_CONFIG_PATH, - NoopExpectedRecordMapper, + AvroExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, - failOnUnknownTypes = true, + nullUnknownTypes = true, ) class S3V2WriteTestParquetSnappy : S3V2WriteTest( S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH, - NoopExpectedRecordMapper, + AvroExpectedRecordMapper, stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, allTypesBehavior = StronglyTyped(integerCanBeLarge = false), nullEqualsUnset = true, - failOnUnknownTypes = true, + nullUnknownTypes = true, ) class S3V2WriteTestEndpointURL :