diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt index b315e41f4b67..94c0b5cfd2b3 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt @@ -22,6 +22,7 @@ import org.apache.iceberg.types.Types.* */ @Singleton class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { + private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) /** * Returns a supertype for [existingType] and [incomingType] if one exists. @@ -67,11 +68,12 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom * @throws IllegalArgumentException if either type is unsupported. */ private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { - val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) - if (typeId1 in unsupportedTypeIds || typeId2 in unsupportedTypeIds) { - val badTypeId = if (typeId1 in unsupportedTypeIds) typeId1 else typeId2 + val providedTypes = listOf(typeId1, typeId2) + val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds } + + if (foundUnsupported.isNotEmpty()) { throw IllegalArgumentException( - "Unsupported or unmapped Iceberg type: $badTypeId. Please implement handling if needed." + "Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed." ) } } @@ -93,14 +95,13 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom ): Type { val existingTypeId = existingType.typeId() val incomingTypeId = incomingType.typeId() - - validateTypeIds(existingTypeId, incomingTypeId) - // If promotion is not allowed by Iceberg, fail fast. if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { throwIllegalTypeCombination(existingType, incomingType, columnName) } + validateTypeIds(existingTypeId, incomingTypeId) + // If both are the same type ID, we just use the existing type if (existingTypeId == incomingTypeId) { // For timestamps, you'd want to reconcile UTC. This is simplified here. diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt index 71db8a87f6a1..80f0bdca6405 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt @@ -119,7 +119,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => BINARY is not supported assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: BINARY") + .hasMessageContaining( + "Conversion for column \"column_name\" between binary and int is not allowed." + ) } @Test @@ -130,7 +132,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => DECIMAL is not supported assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL") + .hasMessageContaining( + "Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed." + ) } @Test @@ -141,7 +145,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => FIXED is not supported assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: FIXED") + .hasMessageContaining( + "Conversion for column \"column_name\" between fixed[16] and int is not allowed." + ) } @Test @@ -152,7 +158,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => UUID is not supported assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: UUID") + .hasMessageContaining( + "Conversion for column \"column_name\" between uuid and int is not allowed." + ) } @Test @@ -174,7 +182,6 @@ class IcebergSuperTypeFinderTest { superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: TIMESTAMP_NANO") } @Test