Skip to content

Commit

Permalink
[Load CDK] test improvements (#50406)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 3, 2025
1 parent f99a67a commit 022b316
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows

sealed interface AllTypesBehavior

Expand Down Expand Up @@ -128,7 +127,7 @@ abstract class BasicFunctionalityIntegrationTest(
*/
val commitDataIncrementally: Boolean,
val allTypesBehavior: AllTypesBehavior,
val failOnUnknownTypes: Boolean = false,
val nullUnknownTypes: Boolean = false,
nullEqualsUnset: Boolean = false,
configUpdater: ConfigurationUpdater = FakeConfigurationUpdater,
) :
Expand Down Expand Up @@ -1954,13 +1953,15 @@ abstract class BasicFunctionalityIntegrationTest(

@Test
open fun testUnknownTypes() {
assumeTrue(verifyDataWriting)
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "problematic_types"),
Append,
ObjectType(
linkedMapOf(
"id" to
"id" to intType,
"name" to
FieldType(
UnknownType(
JsonNodeFactory.instance.objectNode().put("type", "whatever")
Expand All @@ -1982,7 +1983,8 @@ abstract class BasicFunctionalityIntegrationTest(
"problematic_types",
"""
{
"id": "ex falso quodlibet"
"id": 1,
"name": "ex falso quodlibet"
}""".trimIndent(),
emittedAtMs = 1602637589100,
)
Expand All @@ -1996,28 +1998,41 @@ abstract class BasicFunctionalityIntegrationTest(
generationId = 42,
data =
mapOf(
"id" to "ex falso quodlibet",
"id" to 1,
"name" to
if (nullUnknownTypes) {
null
} else {
"ex falso quodlibet"
},
),
airbyteMeta =
OutputRecord.Meta(
syncId = 42,
changes =
if (nullUnknownTypes) {
listOf(
Change(
"name",
AirbyteRecordMessageMetaChange.Change.NULLED,
AirbyteRecordMessageMetaChange.Reason
.DESTINATION_SERIALIZATION_ERROR
)
)
} else {
emptyList()
}
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
)

val dumpBlock = {
dumpAndDiffRecords(
parsedConfig,
expectedRecords,
stream,
primaryKey = listOf(listOf("id")),
cursor = null,
)
}
if (failOnUnknownTypes) {
// Note: this will not catch assertion errors against data
// if the destination actually succeeds (by design).
assertThrows<Exception> { dumpBlock() }
} else {
dumpBlock()
}
dumpAndDiffRecords(
parsedConfig,
expectedRecords,
stream,
primaryKey = listOf(listOf("id")),
cursor = null,
)
}

/**
Expand All @@ -2043,6 +2058,15 @@ abstract class BasicFunctionalityIntegrationTest(
// Our AirbyteType treats them identically, so we don't need two test cases.
"combined_type" to
FieldType(UnionType.of(StringType, IntegerType), nullable = true),
// For destinations which promote unions to objects,
// and also stringify schemaless values,
// we should verify that the promoted schemaless value
// is still labelled as "object" rather than "string".
"union_of_string_and_schemaless_type" to
FieldType(
UnionType.of(ObjectTypeWithoutSchema, IntegerType),
nullable = true,
),
"union_of_objects_with_properties_identical" to
FieldType(
UnionType.of(
Expand Down Expand Up @@ -2132,6 +2156,7 @@ abstract class BasicFunctionalityIntegrationTest(
{
"id": 1,
"combined_type": "string1",
"union_of_string_and_schemaless_type": {"foo": "bar"},
"union_of_objects_with_properties_identical": { "id": 10, "name": "Joe" },
"union_of_objects_with_properties_overlapping": { "id": 20, "name": "Jane", "flagged": true },
"union_of_objects_with_properties_contradicting": { "id": 1, "name": "Jenny" },
Expand Down Expand Up @@ -2188,6 +2213,15 @@ abstract class BasicFunctionalityIntegrationTest(
mapOf(
"id" to 1,
"combined_type" to maybePromote("string", "string1"),
"union_of_string_and_schemaless_type" to
maybePromote(
"object",
if (stringifySchemalessObjects) {
"""{"foo":"bar"}"""
} else {
mapOf("foo" to "bar")
}
),
"union_of_objects_with_properties_identical" to
mapOf("id" to 10, "name" to "Joe"),
"union_of_objects_with_properties_overlapping" to
Expand Down Expand Up @@ -2250,6 +2284,53 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

/**
* Verify that we can handle a stream with 0 columns. This is... not particularly useful, but
* happens sometimes.
*/
open fun testNoColumns() {
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "test_stream"),
Append,
ObjectType(linkedMapOf()),
generationId = 42,
minimumGenerationId = 0,
syncId = 42,
)
runSync(
configContents,
stream,
listOf(
InputRecord(
randomizedNamespace,
"test_stream",
"""{"foo": "bar"}""",
emittedAtMs = 1000L,
)
)
)
dumpAndDiffRecords(
parsedConfig,
listOf(
OutputRecord(
extractedAt = 1000L,
generationId = 42,
data =
if (preserveUndeclaredFields) {
mapOf("foo" to "bar")
} else {
emptyMap()
},
airbyteMeta = OutputRecord.Meta(syncId = 42),
)
),
stream,
primaryKey = listOf(),
cursor = null,
)
}

companion object {
private val intType = FieldType(IntegerType, nullable = true)
private val numberType = FieldType(NumberType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data.avro

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord
import java.time.LocalDate
import java.time.ZoneOffset
import java.time.temporal.ChronoField
import java.time.temporal.TemporalAccessor

object AvroExpectedRecordMapper : ExpectedRecordMapper {
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord {
return expectedRecord.copy(data = timestampsToInteger(expectedRecord.data) as ObjectValue)
}

/**
* Avro doesn't have true temporal types. Instead, we write dates as epoch days, and other
* temporal types as epochMicros. Therefore, in expected records, we should convert from real
* temporal types to IntegerValue.
*/
private fun timestampsToInteger(value: AirbyteValue): AirbyteValue =
when (value) {
is DateValue -> IntegerValue(value.value.toEpochDay())
is TimestampWithTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond()
integerValue(epochSecond, micros)
}
is TimestampWithoutTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(ZoneOffset.UTC)
integerValue(epochSecond, micros)
}
is TimeWithTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH)
integerValue(epochSecond, micros)
}
is TimeWithoutTimezoneValue -> {
val micros = getMicros(value.value)
val epochSecond = value.value.toEpochSecond(LocalDate.EPOCH, ZoneOffset.UTC)
integerValue(epochSecond, micros)
}
is ArrayValue -> ArrayValue(value.values.map { timestampsToInteger(it) })
is ObjectValue ->
ObjectValue(
value.values.mapValuesTo(linkedMapOf()) { (_, v) -> timestampsToInteger(v) }
)
else -> value
}

private fun getMicros(value: TemporalAccessor) = value.getLong(ChronoField.MICRO_OF_SECOND)

private fun integerValue(epochSecond: Long, micros: Long) =
IntegerValue(epochSecond * 1_000_000 + micros)
}
Loading

0 comments on commit 022b316

Please sign in to comment.