Skip to content

Commit

Permalink
Bulk load CDK: test refactors (#48488)
Browse files Browse the repository at this point in the history
Co-authored-by: Johnny Schmidt <[email protected]>
  • Loading branch information
edgao and johnny-schmidt authored Nov 19, 2024
1 parent 251626d commit 7cee0ba
Show file tree
Hide file tree
Showing 41 changed files with 613 additions and 370 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/bulk/core/load/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
testImplementation("io.mockk:mockk:1.13.12")
implementation "org.jetbrains.kotlin:kotlin-reflect:2.0.20"
testFixturesImplementation "uk.org.webcompere:system-stubs-jupiter:2.1.7"

implementation 'com.fasterxml.jackson.module:jackson-module-kotlin'
implementation 'com.fasterxml.jackson.module:jackson-module-afterburner'
}

def integrationTestTask = tasks.register('integrationTest', Test) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class MockBasicFunctionalityIntegrationTest :
}

@Test
override fun testAllTypes() {
super.testAllTypes()
override fun testBasicTypes() {
super.testBasicTypes()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data

import com.fasterxml.jackson.databind.JsonNode
import java.math.BigDecimal
import java.math.BigInteger
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
Expand All @@ -23,6 +24,7 @@ sealed interface AirbyteValue {
is Boolean -> BooleanValue(value)
is Int -> IntegerValue(value.toLong())
is Long -> IntegerValue(value)
is BigInteger -> IntegerValue(value)
is Double -> NumberValue(BigDecimal.valueOf(value))
is BigDecimal -> NumberValue(value)
is LocalDate -> DateValue(value.toString())
Expand Down Expand Up @@ -60,7 +62,8 @@ value class BooleanValue(val value: Boolean) : AirbyteValue, Comparable<BooleanV
}

@JvmInline
value class IntegerValue(val value: Long) : AirbyteValue, Comparable<IntegerValue> {
value class IntegerValue(val value: BigInteger) : AirbyteValue, Comparable<IntegerValue> {
constructor(value: Long) : this(BigInteger.valueOf(value))
override fun compareTo(other: IntegerValue): Int = value.compareTo(other.value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
it.first to it.second.changes.toList()
}

fun nulledOut(
schema: AirbyteType,
context: Context,
reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR
): Pair<AirbyteValue, Context> {
context.changes.add(
DestinationRecord.Change(context.path.joinToString("."), Change.NULLED, reason)
)
return mapInner(NullValue, schema, context)
}

fun mapInner(
value: AirbyteValue,
schema: AirbyteType,
Expand Down Expand Up @@ -69,32 +80,23 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
is NumberType -> mapNumber(value as NumberValue, context)
is StringType -> mapString(value as StringValue, context)
is IntegerType -> mapInteger(value as IntegerValue, context)
is DateType -> mapDate(value as DateValue, context)
is DateType -> mapDate(value, context)
is TimeTypeWithTimezone ->
mapTimeWithTimezone(
value as TimeValue,
value,
context,
)
is TimeTypeWithoutTimezone ->
mapTimeWithoutTimezone(
value as TimeValue,
value,
context,
)
is TimestampTypeWithTimezone ->
mapTimestampWithTimezone(value as TimestampValue, context)
is TimestampTypeWithoutTimezone ->
mapTimestampWithoutTimezone(value as TimestampValue, context)
is TimestampTypeWithTimezone -> mapTimestampWithTimezone(value, context)
is TimestampTypeWithoutTimezone -> mapTimestampWithoutTimezone(value, context)
is UnknownType -> mapUnknown(value as UnknownValue, context)
}
} catch (e: Exception) {
context.changes.add(
DestinationRecord.Change(
context.path.joinToString("."),
Change.NULLED,
Reason.DESTINATION_SERIALIZATION_ERROR
)
)
mapInner(NullValue, schema, context)
nulledOut(schema, context)
}

open fun mapObject(
Expand Down Expand Up @@ -171,24 +173,30 @@ open class AirbyteValueIdentityMapper : AirbyteValueMapper {
open fun mapInteger(value: IntegerValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapDate(value: DateValue, context: Context): Pair<AirbyteValue, Context> =
/**
* Time types are only allowed to be strings on the wire, but can be Int/egerValue if passed
* through [TimeStringToInteger].
*/
open fun mapDate(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> =
value to context

open fun mapTimeWithTimezone(value: TimeValue, context: Context): Pair<AirbyteValue, Context> =
value to context
open fun mapTimeWithTimezone(
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapTimeWithoutTimezone(
value: TimeValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapTimestampWithTimezone(
value: TimestampValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

open fun mapTimestampWithoutTimezone(
value: TimestampValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = value to context

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigInteger

/**
* Mapper for nulling out integers that are out of range. The default behavior is to null out
* integers that are outside the range of a 64-bit signed integer.
*/
class NullOutOfRangeIntegers(
private val minValue: BigInteger = Long.MIN_VALUE.toBigInteger(),
private val maxValue: BigInteger = Long.MAX_VALUE.toBigInteger()
) : AirbyteValueIdentityMapper() {
override fun mapInteger(value: IntegerValue, context: Context): Pair<AirbyteValue, Context> {
if (value.value < minValue || value.value > maxValue) {
return nulledOut(
IntegerType,
context,
AirbyteRecordMessageMetaChange.Reason.DESTINATION_FIELD_SIZE_LIMITATION
)
}
return super.mapInteger(value, context)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class SchemalessTypesToJsonString : AirbyteSchemaIdentityMapper {
override fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType =
StringType
override fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = StringType
override fun mapUnknown(schema: UnknownType): AirbyteType = StringType
}

class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() {
)
}

override fun mapDate(value: DateValue, context: Context): Pair<AirbyteValue, Context> {
override fun mapDate(value: AirbyteValue, context: Context): Pair<AirbyteValue, Context> {
value as DateValue
val epochDay = LocalDate.parse(value.value, DATE_TIME_FORMATTER).toEpochDay()
return IntValue(epochDay.toInt()) to context
}
Expand All @@ -53,14 +54,16 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() {
}

override fun mapTimeWithTimezone(
value: TimeValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = IntegerValue(toMicrosOfDay(value.value)) to context
): Pair<AirbyteValue, Context> =
IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context

override fun mapTimeWithoutTimezone(
value: TimeValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = IntegerValue(toMicrosOfDay(value.value)) to context
): Pair<AirbyteValue, Context> =
IntegerValue(toMicrosOfDay((value as TimeValue).value)) to context

private fun toEpochMicrosWithTimezone(timestampString: String): Long {
val zdt = ZonedDateTime.parse(timestampString, DATE_TIME_FORMATTER)
Expand All @@ -83,11 +86,13 @@ class TimeStringToInteger : AirbyteValueIdentityMapper() {
}

override fun mapTimestampWithTimezone(
value: TimestampValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = IntegerValue(toEpochMicros(value.value)) to context
): Pair<AirbyteValue, Context> =
IntegerValue(toEpochMicros((value as TimestampValue).value)) to context
override fun mapTimestampWithoutTimezone(
value: TimestampValue,
value: AirbyteValue,
context: Context
): Pair<AirbyteValue, Context> = IntegerValue(toEpochMicros(value.value)) to context
): Pair<AirbyteValue, Context> =
IntegerValue(toEpochMicros((value as TimestampValue).value)) to context
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package io.airbyte.cdk.load.data.json

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.util.Jsons
import io.airbyte.cdk.load.util.serializeToString
import java.math.BigDecimal
import java.math.BigInteger

/**
* Converts from json to airbyte value, performing the minimum validation necessary to marshal to a
Expand Down Expand Up @@ -71,7 +72,7 @@ class JsonToAirbyteValue {
return if (json.isTextual) {
StringValue(json.asText())
} else {
StringValue(Jsons.writeValueAsString(json))
StringValue(json.serializeToString())
}
}

Expand All @@ -90,10 +91,10 @@ class JsonToAirbyteValue {
private fun toInteger(json: JsonNode): IntegerValue {
val longVal =
when {
json.isBoolean -> if (json.asBoolean()) 1L else 0L
json.isIntegralNumber -> json.asLong()
json.isFloatingPointNumber -> json.asDouble().toLong()
json.isTextual -> json.asText().toLong()
json.isBoolean -> if (json.asBoolean()) BigInteger.ONE else BigInteger.ZERO
json.isIntegralNumber -> json.bigIntegerValue()
json.isFloatingPointNumber -> json.bigIntegerValue()
json.isTextual -> json.asText().toBigInteger()
else -> throw IllegalArgumentException("Could not convert $json to Integer")
}
return IntegerValue(longVal)
Expand All @@ -103,8 +104,8 @@ class JsonToAirbyteValue {
val numVal =
when {
json.isBoolean -> BigDecimal(if (json.asBoolean()) 1.0 else 0.0)
json.isIntegralNumber -> json.asLong().toBigDecimal()
json.isFloatingPointNumber -> json.asDouble().toBigDecimal()
json.isIntegralNumber -> json.decimalValue()
json.isFloatingPointNumber -> json.decimalValue()
json.isTextual -> json.asText().toBigDecimal()
else -> throw IllegalArgumentException("Could not convert $json to Number")
}
Expand Down Expand Up @@ -150,7 +151,7 @@ class JsonToAirbyteValue {
return convert(json, option)
}

private fun fromJson(json: JsonNode): AirbyteValue {
fun fromJson(json: JsonNode): AirbyteValue {
return when {
json.isBoolean -> toBoolean(json)
json.isIntegralNumber -> toInteger(json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import io.airbyte.cdk.load.data.json.AirbyteValueToJson
import io.airbyte.cdk.load.data.json.JsonToAirbyteValue
import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint
import io.airbyte.cdk.load.message.CheckpointMessage.Stats
import io.airbyte.protocol.models.Jsons
import io.airbyte.cdk.load.util.deserializeToNode
import io.airbyte.protocol.models.v0.AirbyteGlobalState
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
Expand Down Expand Up @@ -62,7 +62,7 @@ data class DestinationRecord(
changes: MutableList<Change> = mutableListOf(),
) : this(
stream = DestinationStream.Descriptor(namespace, name),
data = JsonToAirbyteValue().convert(Jsons.deserialize(data), ObjectTypeWithoutSchema),
data = JsonToAirbyteValue().convert(data.deserializeToNode(), ObjectTypeWithoutSchema),
emittedAtMs = emittedAtMs,
meta = Meta(changes),
serialized = "",
Expand Down Expand Up @@ -287,7 +287,7 @@ data class StreamCheckpoint(
) : this(
Checkpoint(
DestinationStream.Descriptor(streamNamespace, streamName),
state = Jsons.deserialize(blob)
state = blob.deserializeToNode()
),
Stats(sourceRecordCount),
destinationRecordCount?.let { Stats(it) },
Expand Down Expand Up @@ -318,7 +318,7 @@ data class GlobalCheckpoint(
blob: String,
sourceRecordCount: Long,
) : this(
state = Jsons.deserialize(blob),
state = blob.deserializeToNode(),
Stats(sourceRecordCount),
additionalProperties = emptyMap(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.cdk.load.message

import io.airbyte.cdk.util.Jsons
import io.airbyte.cdk.load.util.deserializeToClass
import io.airbyte.protocol.models.v0.AirbyteMessage
import jakarta.inject.Singleton

Expand All @@ -22,7 +22,7 @@ class DefaultDestinationMessageDeserializer(private val messageFactory: Destinat

override fun deserialize(serialized: String): DestinationMessage {
try {
val airbyteMessage = Jsons.readValue(serialized, AirbyteMessage::class.java)
val airbyteMessage = serialized.deserializeToClass(AirbyteMessage::class.java)
return messageFactory.fromAirbyteMessage(airbyteMessage, serialized)
} catch (t: Throwable) {
throw RuntimeException("Failed to deserialize AirbyteMessage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,55 @@

package io.airbyte.cdk.load.util

import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.StreamReadConstraints
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.util.Jsons
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.afterburner.AfterburnerModule
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.io.InputStream

fun JsonNode.serializeToString(): String {
object Jsons : ObjectMapper() {
// allow jackson to deserialize anything under 100 MiB
// (the default, at time of writing 2024-05-29, with jackson 2.15.2, is 20 MiB)
private const val JSON_MAX_LENGTH = 100 * 1024 * 1024

init {
registerKotlinModule()
registerModule(JavaTimeModule())
registerModule(AfterburnerModule())
setSerializationInclusion(JsonInclude.Include.NON_NULL)
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true)
factory.setStreamReadConstraints(
StreamReadConstraints.builder().maxStringLength(JSON_MAX_LENGTH).build()
)
}
}

fun <T> T.serializeToString(): String {
return Jsons.writeValueAsString(this)
}

fun <T> InputStream.readIntoClass(klass: Class<T>): T =
Jsons.readTree(this).let { Jsons.treeToValue(it, klass) }

fun <T> T.deserializeToPrettyPrintedString(): String {
return Jsons.writerWithDefaultPrettyPrinter().writeValueAsString(this)
}

fun String.deserializeToNode(): JsonNode {
return Jsons.readTree(this)
}

fun <T> String.deserializeToClass(klass: Class<T>): T {
return Jsons.readValue(this, klass)
}

fun Any.serializeToJsonBytes(): ByteArray {
return Jsons.writeValueAsBytes(this)
}
Loading

0 comments on commit 7cee0ba

Please sign in to comment.