Skip to content

Commit

Permalink
Merge branch 'iceberg-schema-supertype-finder' into iceberg-schema-up…
Browse files Browse the repository at this point in the history
…date-logic
  • Loading branch information
subodh1810 committed Dec 25, 2024
2 parents cf65854 + 3b39576 commit 5542700
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.iceberg.types.Types.*
*/
@Singleton
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) {
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)

/**
* Returns a supertype for [existingType] and [incomingType] if one exists.
Expand Down Expand Up @@ -67,11 +68,12 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom
* @throws IllegalArgumentException if either type is unsupported.
*/
private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) {
val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)
if (typeId1 in unsupportedTypeIds || typeId2 in unsupportedTypeIds) {
val badTypeId = if (typeId1 in unsupportedTypeIds) typeId1 else typeId2
val providedTypes = listOf(typeId1, typeId2)
val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds }

if (foundUnsupported.isNotEmpty()) {
throw IllegalArgumentException(
"Unsupported or unmapped Iceberg type: $badTypeId. Please implement handling if needed."
"Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed."
)
}
}
Expand All @@ -93,14 +95,13 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom
): Type {
val existingTypeId = existingType.typeId()
val incomingTypeId = incomingType.typeId()

validateTypeIds(existingTypeId, incomingTypeId)

// If promotion is not allowed by Iceberg, fail fast.
if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) {
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

validateTypeIds(existingTypeId, incomingTypeId)

// If both are the same type ID, we just use the existing type
if (existingTypeId == incomingTypeId) {
// For timestamps, you'd want to reconcile UTC. This is simplified here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ class IcebergSuperTypeFinderTest {
// Fails in validateTypeIds => BINARY is not supported
assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Unsupported or unmapped Iceberg type: BINARY")
.hasMessageContaining(
"Conversion for column \"column_name\" between binary and int is not allowed."
)
}

@Test
Expand All @@ -130,7 +132,9 @@ class IcebergSuperTypeFinderTest {
// Fails in validateTypeIds => DECIMAL is not supported
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL")
.hasMessageContaining(
"Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed."
)
}

@Test
Expand All @@ -141,7 +145,9 @@ class IcebergSuperTypeFinderTest {
// Fails in validateTypeIds => FIXED is not supported
assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Unsupported or unmapped Iceberg type: FIXED")
.hasMessageContaining(
"Conversion for column \"column_name\" between fixed[16] and int is not allowed."
)
}

@Test
Expand All @@ -152,7 +158,9 @@ class IcebergSuperTypeFinderTest {
// Fails in validateTypeIds => UUID is not supported
assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Unsupported or unmapped Iceberg type: UUID")
.hasMessageContaining(
"Conversion for column \"column_name\" between uuid and int is not allowed."
)
}

@Test
Expand All @@ -174,7 +182,6 @@ class IcebergSuperTypeFinderTest {
superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name")
}
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Unsupported or unmapped Iceberg type: TIMESTAMP_NANO")
}

@Test
Expand Down

0 comments on commit 5542700

Please sign in to comment.