Skip to content

Commit

Permalink
Try add index
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Nov 20, 2024
1 parent 00a01ca commit c6c6b3b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ interface DestinationTaskLauncher : TaskLauncher {
suspend fun handleNewBatch(stream: DestinationStream.Descriptor, wrapped: BatchEnvelope<*>)
suspend fun handleStreamClosed(stream: DestinationStream.Descriptor)
suspend fun handleTeardownComplete()
suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile)
suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long)
}

/**
Expand Down Expand Up @@ -266,7 +266,7 @@ class DefaultDestinationTaskLauncher(
succeeded.send(true)
}

override suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile) {
override suspend fun handleFile(stream: DestinationStream.Descriptor, file: DestinationFile, index: Long) {
enqueue(processFileTaskFactory.make(this, stream, file))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.task.implementor

import com.google.common.collect.Range
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.message.BatchEnvelope
import io.airbyte.cdk.load.message.DestinationFile
Expand All @@ -21,7 +22,8 @@ class DefaultProcessFileTask(
override val streamDescriptor: DestinationStream.Descriptor,
private val taskLauncher: DestinationTaskLauncher,
private val syncManager: SyncManager,
private val file: DestinationFile
private val file: DestinationFile,
private val index: Long,
) : ProcessFileTask {
val log = KotlinLogging.logger {}

Expand All @@ -30,7 +32,7 @@ class DefaultProcessFileTask(

val batch = streamLoader.processFile(file)

val wrapped = BatchEnvelope(batch)
val wrapped = BatchEnvelope(batch, Range.singleton(index))
taskLauncher.handleNewBatch(streamDescriptor, wrapped)
}
}
Expand All @@ -40,6 +42,7 @@ interface ProcessFileTaskFactory {
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream.Descriptor,
file: DestinationFile,
index: Long,
): ProcessFileTask
}

Expand All @@ -52,6 +55,7 @@ class DefaultFileRecordsTaskFactory(
taskLauncher: DestinationTaskLauncher,
stream: DestinationStream.Descriptor,
file: DestinationFile,
index: Long,
): ProcessFileTask {
return DefaultProcessFileTask(stream, taskLauncher, syncManager, file)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ class DefaultInputConsumerTask(
is DestinationRecordStreamIncomplete ->
throw IllegalStateException("Stream $stream failed upstream, cannot continue.")
is DestinationFile -> {
destinationTaskLauncher.handleFile(stream, message)
manager.countRecordIn()
val index = manager.countRecordIn()
destinationTaskLauncher.handleFile(stream, message, index)
}
is DestinationFileStreamComplete -> {
reserved.release() // safe because multiple calls conflate
Expand Down

0 comments on commit c6c6b3b

Please sign in to comment.