From 5b6b2676b379ebc0eb44f0cdc0853b84c7c28363 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Mon, 23 Dec 2024 17:29:57 -0800 Subject: [PATCH] Destination S3V2: Guard against name conflicts --- .../toolkits/load-object-storage/build.gradle | 9 +++ .../ObjectStoragePathFactory.kt | 18 +++-- .../ObjectStorageDestinationStateManager.kt | 37 +++++++++-- .../ObjectStorageStreamLoaderFactory.kt | 2 +- .../object_storage/RecordToPartAccumulator.kt | 11 ++-- .../ObjectStoragePathFactoryUTest.kt | 48 ++++++++++++++ .../ObjectStorageDestinationStateTest.kt | 2 +- .../ObjectStorageDestinationStateUTest.kt | 65 +++++++++++++++++++ .../RecordToPartAccumulatorTest.kt | 15 +++-- .../io/airbyte/cdk/load/MockPathFactory.kt | 5 +- .../cdk/load/ObjectStorageDataDumper.kt | 4 +- .../destination-s3-v2/metadata.yaml | 5 ++ .../destination/s3_v2/S3V2TestUtils.kt | 1 + .../destination/s3_v2/S3V2WriteTest.kt | 9 +++ 14 files changed, 205 insertions(+), 26 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle index 47e3994e376e..aee9515324f8 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle @@ -16,3 +16,12 @@ dependencies { testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-csv")) testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-parquet")) } + +project.tasks.matching { + it.name == 'spotbugsIntegrationTestLegacy' || + it.name == 'spotbugsIntegrationTest' || + it.name == 'spotbugsTest' || + it.name == 'spotbugsMain' +}.configureEach { + enabled = false +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt index 3dbbc18911f8..36b1d9778a26 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactory.kt @@ -36,7 +36,7 @@ interface PathFactory { isStaging: Boolean = false, extension: String? = null ): String - fun getPathMatcher(stream: DestinationStream): PathMatcher + fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher val supportsStaging: Boolean val prefix: String @@ -47,12 +47,13 @@ data class PathMatcher(val regex: Regex, val variableToIndex: Map) val match = regex.matchEntire(path) ?: return null return PathMatcherResult( path, - variableToIndex["part_number"]?.let { match.groupValues[it].toLong() } + variableToIndex["part_number"]?.let { match.groupValues[it].toLong() }, + variableToIndex["suffix"]?.let { match.groupValues[it].let { g -> g.ifBlank { null } } } ) } } -data class PathMatcherResult(val path: String, val partNumber: Long?) +data class PathMatcherResult(val path: String, val partNumber: Long?, val customSuffix: String?) @Singleton @Secondary @@ -370,7 +371,7 @@ class ObjectStoragePathFactory( } } - override fun getPathMatcher(stream: DestinationStream): PathMatcher { + override fun getPathMatcher(stream: DestinationStream, suffixPattern: String?): PathMatcher { val pathVariableToPattern = getPathVariableToPattern(stream) val variableToIndex = mutableMapOf() @@ -396,6 +397,13 @@ class ObjectStoragePathFactory( } else { "$prefix/$replacedForPath$replacedForFile" } - return PathMatcher(Regex(combined), variableToIndex) + val withSuffix = + if (suffixPattern != null) { + variableToIndex["suffix"] = variableToIndex.size + 1 + "$combined$suffixPattern" + } else { + combined + } + return PathMatcher(Regex(withSuffix), variableToIndex) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt index 54a649c1517a..fb5f6c8ef947 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateManager.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.file.object_storage.PathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.state.DestinationState import io.airbyte.cdk.load.state.DestinationStatePersister +import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN import io.airbyte.cdk.load.util.readIntoClass import io.airbyte.cdk.load.util.serializeToJsonBytes import io.github.oshai.kotlinlogging.KotlinLogging @@ -20,6 +21,7 @@ import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Secondary import jakarta.inject.Singleton import java.nio.file.Paths +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.toList import kotlinx.coroutines.sync.Mutex @@ -31,6 +33,7 @@ class ObjectStorageDestinationState( @JsonProperty("generations_by_state") var generationMap: MutableMap>> = mutableMapOf(), + @JsonProperty("count_by_key") var countByKey: MutableMap = mutableMapOf() ) : DestinationState { enum class State { STAGED, @@ -39,6 +42,9 @@ class ObjectStorageDestinationState( companion object { const val METADATA_GENERATION_ID_KEY = "ab-generation-id" + const val STREAM_NAMESPACE_KEY = "ab-stream-namespace" + const val STREAM_NAME_KEY = "ab-stream-name" + const val OPTIONAL_ORDINAL_SUFFIX_PATTERN = "(-[0-9]+)?" fun metadataFor(stream: DestinationStream): Map = mapOf(METADATA_GENERATION_ID_KEY to stream.generationId.toString()) @@ -123,6 +129,18 @@ class ObjectStorageDestinationState( it.objects.filter { obj -> obj.key !in keepKeys }.map { obj -> it.generationId to obj } } } + + /** Used to guarantee the uniqueness of a key */ + suspend fun ensureUnique(key: String): String { + val ordinal = + accessLock.withLock { countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } } + ?: 0L + return if (ordinal > 0L) { + "$key-$ordinal" + } else { + key + } + } } @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation") @@ -165,16 +183,22 @@ class ObjectStorageFallbackPersister( ) : DestinationStatePersister { private val log = KotlinLogging.logger {} override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState { - val matcher = pathFactory.getPathMatcher(stream) + // Add a suffix matching an OPTIONAL -[0-9]+ ordinal + val matcher = + pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN) val longestUnambiguous = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false) log.info { "Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata" } - client - .list(longestUnambiguous) - .mapNotNull { matcher.match(it.key) } - .toList() + val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList() + val countByKey = mutableMapOf() + matches.forEach { + val key = it.path.replace(Regex("-[0-9]+$"), "") + val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0 + countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) } + } + matches .groupBy { client .getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY] @@ -191,7 +215,8 @@ class ObjectStorageFallbackPersister( "Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})" } return ObjectStorageDestinationState( - mutableMapOf(ObjectStorageDestinationState.State.FINALIZED to it) + mutableMapOf(ObjectStorageDestinationState.State.FINALIZED to it), + countByKey ) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 613df6db63e1..03e56093fcb4 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -99,7 +99,7 @@ class ObjectStorageStreamLoader, U : OutputStream>( fileSizeBytes = fileSizeBytes, stream, fileNumber - ) + ) { name -> destinationStateManager.getState(stream).ensureUnique(name) } } override suspend fun createFileBatchAccumulator( diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt index 6180f9b7ff62..cedba80734af 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulator.kt @@ -30,6 +30,7 @@ class RecordToPartAccumulator( private val fileSizeBytes: Long, private val stream: DestinationStream, private val fileNumber: AtomicLong, + private val fileNameMapper: suspend (String) -> String ) : BatchAccumulator { private val log = KotlinLogging.logger {} @@ -50,10 +51,12 @@ class RecordToPartAccumulator( partFactory = PartFactory( key = - pathFactory.getPathToFile( - stream, - fileNo, - isStaging = pathFactory.supportsStaging + fileNameMapper( + pathFactory.getPathToFile( + stream, + fileNo, + isStaging = pathFactory.supportsStaging + ) ), fileNumber = fileNo ), diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt new file mode 100644 index 000000000000..46e3e28a13af --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/file/object_storage/ObjectStoragePathFactoryUTest.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.file.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider +import io.airbyte.cdk.load.file.TimeProvider +import io.mockk.every +import io.mockk.impl.annotations.MockK +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class ObjectStoragePathFactoryUTest { + @MockK lateinit var stream: DestinationStream + @MockK lateinit var pathConfigProvider: ObjectStoragePathConfigurationProvider + @MockK lateinit var timeProvider: TimeProvider + + @BeforeEach + fun setup() { + every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream") + every { timeProvider.syncTimeMillis() } returns 0 + every { timeProvider.currentTimeMillis() } returns 1 + } + + @Test + fun `test matcher with suffix`() { + every { pathConfigProvider.objectStoragePathConfiguration } returns + ObjectStoragePathConfiguration( + "prefix", + null, + "path/", + "ambiguous_filename", + false, + ) + val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider) + + val matcher = factory.getPathMatcher(stream, "(-\\d+)?") + val match1 = matcher.match("prefix/path/ambiguous_filename") + assert(match1 != null) + assert(match1?.customSuffix == null) + val match2 = matcher.match("prefix/path/ambiguous_filename-1") + assert(match2 != null) + assert(match2?.customSuffix == "-1") + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt index b8f244746b7c..de22e0674f3f 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateTest.kt @@ -34,7 +34,7 @@ class ObjectStorageDestinationStateTest { companion object { val stream1 = MockDestinationCatalogFactory.stream1 const val PERSISTED = - """{"generations_by_state":{"FINALIZED":{"0":{"key1":0,"key2":1},"1":{"key3":0,"key4":1}}}}""" + """{"generations_by_state":{"FINALIZED":{"0":{"key1":0,"key2":1},"1":{"key3":0,"key4":1}}},"count_by_key":{}}""" } @Singleton diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt new file mode 100644 index 000000000000..d920d1f4697d --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/state/object_storage/ObjectStorageDestinationStateUTest.kt @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.state.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient +import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory +import io.airbyte.cdk.load.file.object_storage.PathMatcher +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.mockk.coEvery +import io.mockk.every +import io.mockk.impl.annotations.MockK +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +class ObjectStorageDestinationStateUTest { + data class MockObj(override val key: String, override val storageConfig: Unit = Unit) : + RemoteObject + + @MockK lateinit var stream: DestinationStream + @MockK lateinit var client: ObjectStorageClient<*> + @MockK lateinit var pathFactory: ObjectStoragePathFactory + + @BeforeEach + fun setup() { + every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream") + every { pathFactory.getPathMatcher(any(), any()) } answers + { + val suffix = secondArg() + PathMatcher(Regex("([a-z]+)$suffix"), mapOf("suffix" to 2)) + } + every { pathFactory.getLongestStreamConstantPrefix(any(), any()) } returns "prefix/" + } + + @Test + fun `test that the fallback persister correctly infers the unique key to ordinal count`() = + runTest { + coEvery { client.list(any()) } returns + flowOf( + MockObj("dog"), + MockObj("dog-1"), + MockObj("dog-3"), + MockObj("cat"), + MockObj("turtle-100") + ) + coEvery { client.getMetadata(any()) } returns mapOf("ab-generation-id" to "1") + + val persister = ObjectStorageFallbackPersister(client, pathFactory) + val state = persister.load(stream) + assert(state.countByKey["dog"] == 3L) + assert(state.countByKey["cat"] == 0L) + assert(state.countByKey["turtle"] == 100L) + + assert(state.ensureUnique("dog") == "dog-4") + assert(state.ensureUnique("dog") == "dog-5") + assert(state.ensureUnique("cat") == "cat-1") + assert(state.ensureUnique("turtle") == "turtle-101") + assert(state.ensureUnique("turtle") == "turtle-102") + assert(state.ensureUnique("spider") == "spider") + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt index 9b1d24b7a99c..1712ff0c8e05 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/write/object_storage/RecordToPartAccumulatorTest.kt @@ -66,7 +66,8 @@ class RecordToPartAccumulatorTest { partSizeBytes = partSizeBytes, fileSizeBytes = fileSizeBytes, stream = stream, - fileNumber = fileNumber + fileNumber = fileNumber, + fileNameMapper = { "$it!" }, ) val bufferSize = AtomicLong(0L) @@ -102,7 +103,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) assert(!batch.part.isFinal) - assert(batch.part.key == "path.111") + assert(batch.part.key == "path.111!") } else -> assert(false) } @@ -115,7 +116,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) assert(!batch.part.isFinal) - assert(batch.part.key == "path.111") + assert(batch.part.key == "path.111!") } else -> assert(false) } @@ -128,7 +129,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) assert(!batch.part.isFinal) - assert(batch.part.key == "path.111") + assert(batch.part.key == "path.111!") } else -> assert(false) } @@ -142,7 +143,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 111L) assert(!batch.isPersisted()) assert(batch.part.isFinal) - assert(batch.part.key == "path.111") + assert(batch.part.key == "path.111!") } else -> assert(false) } @@ -157,7 +158,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 112L) assert(!batch.isPersisted()) assert(batch.part.isFinal) - assert(batch.part.key == "path.112") + assert(batch.part.key == "path.112!") } else -> assert(false) } @@ -171,7 +172,7 @@ class RecordToPartAccumulatorTest { assert(batch.part.fileNumber == 113L) assert(!batch.isPersisted()) assert(batch.part.isFinal) - assert(batch.part.key == "path.113") + assert(batch.part.key == "path.113!") } else -> assert(false) } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt index 581bf94dd248..c76239ebcec1 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/MockPathFactory.kt @@ -59,7 +59,10 @@ open class MockPathFactory : PathFactory { } } - override fun getPathMatcher(stream: DestinationStream): PathMatcher { + override fun getPathMatcher( + stream: DestinationStream, + suffixPattern: String? // ignored + ): PathMatcher { return PathMatcher( regex = Regex( diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt index 71edac9b3bd5..17697e1f7fba 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/testFixtures/kotlin/io/airbyte/cdk/load/ObjectStorageDataDumper.kt @@ -25,6 +25,7 @@ import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.file.parquet.toParquetReader +import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN import io.airbyte.cdk.load.test.util.OutputRecord import io.airbyte.cdk.load.test.util.maybeUnflatten import io.airbyte.cdk.load.test.util.toOutputRecord @@ -53,7 +54,8 @@ class ObjectStorageDataDumper( // and the path matcher, so a failure here might imply a bug in the metadata-based // destination state loader, which lists by `prefix` and filters against the matcher. val prefix = pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false) - val matcher = pathFactory.getPathMatcher(stream) + val matcher = + pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN) return runBlocking { withContext(Dispatchers.IO) { client diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index e6619fdb6026..70869b693b41 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -129,4 +129,9 @@ data: secretStore: type: GSM alias: airbyte-connector-testing-secret-store + - name: SECRET_DESTINATION-S3-V2_AMBIGUOUS_FILEPATH + fileName: s3_dest_v2_ambiguous_filepath_config.json + secretStore: + type: GSM + alias: airbyte-connector-testing-secret-store metadataSpecVersion: "1.0" diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt index 713abb1d6b6f..c9aa069f8268 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2TestUtils.kt @@ -23,5 +23,6 @@ object S3V2TestUtils { const val PARQUET_SNAPPY_CONFIG_PATH = "secrets/s3_dest_v2_parquet_snappy_config.json" const val ENDPOINT_URL_CONFIG_PATH = "secrets/s3_dest_v2_endpoint_url_config.json" const val ENDPOINT_EMPTY_URL_CONFIG_PATH = "secrets/s3_dest_v2_endpoint_empty_url_config.json" + const val AMBIGUOUS_FILEPATH_CONFIG_PATH = "secrets/s3_dest_v2_ambiguous_filepath_config.json" fun getConfig(configPath: String): String = Files.readString(Path.of(configPath)) } diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 02169a3a85bd..47ea931aa7a6 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -206,3 +206,12 @@ class S3V2WriteTestEndpointURL : allTypesBehavior = Untyped, nullEqualsUnset = true, ) + +class S3V2AmbiguousFilepath : + S3V2WriteTest( + S3V2TestUtils.AMBIGUOUS_FILEPATH_CONFIG_PATH, + stringifySchemalessObjects = false, + promoteUnionToObject = false, + preserveUndeclaredFields = true, + allTypesBehavior = Untyped, + )