Skip to content

Commit

Permalink
Destination S3V2: Guard against name conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Dec 24, 2024
1 parent 0159fef commit 5b6b267
Show file tree
Hide file tree
Showing 14 changed files with 205 additions and 26 deletions.
9 changes: 9 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@ dependencies {
testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-csv"))
testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-parquet"))
}

project.tasks.matching {
it.name == 'spotbugsIntegrationTestLegacy' ||
it.name == 'spotbugsIntegrationTest' ||
it.name == 'spotbugsTest' ||
it.name == 'spotbugsMain'
}.configureEach {
enabled = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ interface PathFactory {
isStaging: Boolean = false,
extension: String? = null
): String
fun getPathMatcher(stream: DestinationStream): PathMatcher
fun getPathMatcher(stream: DestinationStream, suffixPattern: String? = null): PathMatcher

val supportsStaging: Boolean
val prefix: String
Expand All @@ -47,12 +47,13 @@ data class PathMatcher(val regex: Regex, val variableToIndex: Map<String, Int>)
val match = regex.matchEntire(path) ?: return null
return PathMatcherResult(
path,
variableToIndex["part_number"]?.let { match.groupValues[it].toLong() }
variableToIndex["part_number"]?.let { match.groupValues[it].toLong() },
variableToIndex["suffix"]?.let { match.groupValues[it].let { g -> g.ifBlank { null } } }
)
}
}

data class PathMatcherResult(val path: String, val partNumber: Long?)
data class PathMatcherResult(val path: String, val partNumber: Long?, val customSuffix: String?)

@Singleton
@Secondary
Expand Down Expand Up @@ -370,7 +371,7 @@ class ObjectStoragePathFactory(
}
}

override fun getPathMatcher(stream: DestinationStream): PathMatcher {
override fun getPathMatcher(stream: DestinationStream, suffixPattern: String?): PathMatcher {
val pathVariableToPattern = getPathVariableToPattern(stream)
val variableToIndex = mutableMapOf<String, Int>()

Expand All @@ -396,6 +397,13 @@ class ObjectStoragePathFactory(
} else {
"$prefix/$replacedForPath$replacedForFile"
}
return PathMatcher(Regex(combined), variableToIndex)
val withSuffix =
if (suffixPattern != null) {
variableToIndex["suffix"] = variableToIndex.size + 1
"$combined$suffixPattern"
} else {
combined
}
return PathMatcher(Regex(withSuffix), variableToIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ import io.airbyte.cdk.load.file.object_storage.PathFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.state.DestinationState
import io.airbyte.cdk.load.state.DestinationStatePersister
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState.Companion.OPTIONAL_ORDINAL_SUFFIX_PATTERN
import io.airbyte.cdk.load.util.readIntoClass
import io.airbyte.cdk.load.util.serializeToJsonBytes
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.nio.file.Paths
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.sync.Mutex
Expand All @@ -31,6 +33,7 @@ class ObjectStorageDestinationState(
@JsonProperty("generations_by_state")
var generationMap: MutableMap<State, MutableMap<Long, MutableMap<String, Long>>> =
mutableMapOf(),
@JsonProperty("count_by_key") var countByKey: MutableMap<String, Long> = mutableMapOf()
) : DestinationState {
enum class State {
STAGED,
Expand All @@ -39,6 +42,9 @@ class ObjectStorageDestinationState(

companion object {
const val METADATA_GENERATION_ID_KEY = "ab-generation-id"
const val STREAM_NAMESPACE_KEY = "ab-stream-namespace"
const val STREAM_NAME_KEY = "ab-stream-name"
const val OPTIONAL_ORDINAL_SUFFIX_PATTERN = "(-[0-9]+)?"

fun metadataFor(stream: DestinationStream): Map<String, String> =
mapOf(METADATA_GENERATION_ID_KEY to stream.generationId.toString())
Expand Down Expand Up @@ -123,6 +129,18 @@ class ObjectStorageDestinationState(
it.objects.filter { obj -> obj.key !in keepKeys }.map { obj -> it.generationId to obj }
}
}

/** Used to guarantee the uniqueness of a key */
suspend fun ensureUnique(key: String): String {
val ordinal =
accessLock.withLock { countByKey.merge(key, 0L) { old, new -> maxOf(old + 1, new) } }
?: 0L
return if (ordinal > 0L) {
"$key-$ordinal"
} else {
key
}
}
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION", justification = "Kotlin async continuation")
Expand Down Expand Up @@ -165,16 +183,22 @@ class ObjectStorageFallbackPersister(
) : DestinationStatePersister<ObjectStorageDestinationState> {
private val log = KotlinLogging.logger {}
override suspend fun load(stream: DestinationStream): ObjectStorageDestinationState {
val matcher = pathFactory.getPathMatcher(stream)
// Add a suffix matching an OPTIONAL -[0-9]+ ordinal
val matcher =
pathFactory.getPathMatcher(stream, suffixPattern = OPTIONAL_ORDINAL_SUFFIX_PATTERN)
val longestUnambiguous =
pathFactory.getLongestStreamConstantPrefix(stream, isStaging = false)
log.info {
"Searching path $longestUnambiguous (matching ${matcher.regex}) for destination state metadata"
}
client
.list(longestUnambiguous)
.mapNotNull { matcher.match(it.key) }
.toList()
val matches = client.list(longestUnambiguous).mapNotNull { matcher.match(it.key) }.toList()
val countByKey = mutableMapOf<String, Long>()
matches.forEach {
val key = it.path.replace(Regex("-[0-9]+$"), "")
val ordinal = it.customSuffix?.substring(1)?.toLongOrNull() ?: 0
countByKey.merge(key, ordinal) { a, b -> maxOf(a, b) }
}
matches
.groupBy {
client
.getMetadata(it.path)[ObjectStorageDestinationState.METADATA_GENERATION_ID_KEY]
Expand All @@ -191,7 +215,8 @@ class ObjectStorageFallbackPersister(
"Inferred state for generations with size: $generationSizes (minimum=${stream.minimumGenerationId}; current=${stream.generationId})"
}
return ObjectStorageDestinationState(
mutableMapOf(ObjectStorageDestinationState.State.FINALIZED to it)
mutableMapOf(ObjectStorageDestinationState.State.FINALIZED to it),
countByKey
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ObjectStorageStreamLoader<T : RemoteObject<*>, U : OutputStream>(
fileSizeBytes = fileSizeBytes,
stream,
fileNumber
)
) { name -> destinationStateManager.getState(stream).ensureUnique(name) }
}

override suspend fun createFileBatchAccumulator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class RecordToPartAccumulator<U : OutputStream>(
private val fileSizeBytes: Long,
private val stream: DestinationStream,
private val fileNumber: AtomicLong,
private val fileNameMapper: suspend (String) -> String
) : BatchAccumulator {
private val log = KotlinLogging.logger {}

Expand All @@ -50,10 +51,12 @@ class RecordToPartAccumulator<U : OutputStream>(
partFactory =
PartFactory(
key =
pathFactory.getPathToFile(
stream,
fileNo,
isStaging = pathFactory.supportsStaging
fileNameMapper(
pathFactory.getPathToFile(
stream,
fileNo,
isStaging = pathFactory.supportsStaging
)
),
fileNumber = fileNo
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider
import io.airbyte.cdk.load.file.TimeProvider
import io.mockk.every
import io.mockk.impl.annotations.MockK
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class ObjectStoragePathFactoryUTest {
@MockK lateinit var stream: DestinationStream
@MockK lateinit var pathConfigProvider: ObjectStoragePathConfigurationProvider
@MockK lateinit var timeProvider: TimeProvider

@BeforeEach
fun setup() {
every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream")
every { timeProvider.syncTimeMillis() } returns 0
every { timeProvider.currentTimeMillis() } returns 1
}

@Test
fun `test matcher with suffix`() {
every { pathConfigProvider.objectStoragePathConfiguration } returns
ObjectStoragePathConfiguration(
"prefix",
null,
"path/",
"ambiguous_filename",
false,
)
val factory = ObjectStoragePathFactory(pathConfigProvider, null, null, timeProvider)

val matcher = factory.getPathMatcher(stream, "(-\\d+)?")
val match1 = matcher.match("prefix/path/ambiguous_filename")
assert(match1 != null)
assert(match1?.customSuffix == null)
val match2 = matcher.match("prefix/path/ambiguous_filename-1")
assert(match2 != null)
assert(match2?.customSuffix == "-1")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ObjectStorageDestinationStateTest {
companion object {
val stream1 = MockDestinationCatalogFactory.stream1
const val PERSISTED =
"""{"generations_by_state":{"FINALIZED":{"0":{"key1":0,"key2":1},"1":{"key3":0,"key4":1}}}}"""
"""{"generations_by_state":{"FINALIZED":{"0":{"key1":0,"key2":1},"1":{"key3":0,"key4":1}}},"count_by_key":{}}"""
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.state.object_storage

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.PathMatcher
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.mockk.coEvery
import io.mockk.every
import io.mockk.impl.annotations.MockK
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class ObjectStorageDestinationStateUTest {
data class MockObj(override val key: String, override val storageConfig: Unit = Unit) :
RemoteObject<Unit>

@MockK lateinit var stream: DestinationStream
@MockK lateinit var client: ObjectStorageClient<*>
@MockK lateinit var pathFactory: ObjectStoragePathFactory

@BeforeEach
fun setup() {
every { stream.descriptor } returns DestinationStream.Descriptor("test", "stream")
every { pathFactory.getPathMatcher(any(), any()) } answers
{
val suffix = secondArg<String>()
PathMatcher(Regex("([a-z]+)$suffix"), mapOf("suffix" to 2))
}
every { pathFactory.getLongestStreamConstantPrefix(any(), any()) } returns "prefix/"
}

@Test
fun `test that the fallback persister correctly infers the unique key to ordinal count`() =
runTest {
coEvery { client.list(any()) } returns
flowOf(
MockObj("dog"),
MockObj("dog-1"),
MockObj("dog-3"),
MockObj("cat"),
MockObj("turtle-100")
)
coEvery { client.getMetadata(any()) } returns mapOf("ab-generation-id" to "1")

val persister = ObjectStorageFallbackPersister(client, pathFactory)
val state = persister.load(stream)
assert(state.countByKey["dog"] == 3L)
assert(state.countByKey["cat"] == 0L)
assert(state.countByKey["turtle"] == 100L)

assert(state.ensureUnique("dog") == "dog-4")
assert(state.ensureUnique("dog") == "dog-5")
assert(state.ensureUnique("cat") == "cat-1")
assert(state.ensureUnique("turtle") == "turtle-101")
assert(state.ensureUnique("turtle") == "turtle-102")
assert(state.ensureUnique("spider") == "spider")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class RecordToPartAccumulatorTest {
partSizeBytes = partSizeBytes,
fileSizeBytes = fileSizeBytes,
stream = stream,
fileNumber = fileNumber
fileNumber = fileNumber,
fileNameMapper = { "$it!" },
)

val bufferSize = AtomicLong(0L)
Expand Down Expand Up @@ -102,7 +103,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(!batch.part.isFinal)
assert(batch.part.key == "path.111")
assert(batch.part.key == "path.111!")
}
else -> assert(false)
}
Expand All @@ -115,7 +116,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(!batch.part.isFinal)
assert(batch.part.key == "path.111")
assert(batch.part.key == "path.111!")
}
else -> assert(false)
}
Expand All @@ -128,7 +129,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(!batch.part.isFinal)
assert(batch.part.key == "path.111")
assert(batch.part.key == "path.111!")
}
else -> assert(false)
}
Expand All @@ -142,7 +143,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 111L)
assert(!batch.isPersisted())
assert(batch.part.isFinal)
assert(batch.part.key == "path.111")
assert(batch.part.key == "path.111!")
}
else -> assert(false)
}
Expand All @@ -157,7 +158,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 112L)
assert(!batch.isPersisted())
assert(batch.part.isFinal)
assert(batch.part.key == "path.112")
assert(batch.part.key == "path.112!")
}
else -> assert(false)
}
Expand All @@ -171,7 +172,7 @@ class RecordToPartAccumulatorTest {
assert(batch.part.fileNumber == 113L)
assert(!batch.isPersisted())
assert(batch.part.isFinal)
assert(batch.part.key == "path.113")
assert(batch.part.key == "path.113!")
}
else -> assert(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ open class MockPathFactory : PathFactory {
}
}

override fun getPathMatcher(stream: DestinationStream): PathMatcher {
override fun getPathMatcher(
stream: DestinationStream,
suffixPattern: String? // ignored
): PathMatcher {
return PathMatcher(
regex =
Regex(
Expand Down
Loading

0 comments on commit 5b6b267

Please sign in to comment.