Skip to content

Commit

Permalink
[WIP] Prerelease S3V2 Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Dec 24, 2024
1 parent 671d4de commit 0159fef
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ interface QueueWriter<T> : CloseableCoroutine {
interface MessageQueue<T> : QueueReader<T>, QueueWriter<T>

abstract class ChannelMessageQueue<T> : MessageQueue<T> {
open val channel = Channel<T>(Channel.UNLIMITED)
open val channel: Channel<T> = Channel(Channel.UNLIMITED)

override suspend fun publish(message: T) = channel.send(message)
override suspend fun consume(): Flow<T> = channel.receiveAsFlow()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import jakarta.inject.Singleton
import java.io.File
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicLong
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take

@Singleton
@Secondary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import kotlin.time.measureTime
import kotlin.time.measureTimedValue
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -51,6 +54,7 @@ class S3MultipartUpload<T : OutputStream>(
private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer)
private val partQueue = Channel<ByteArray>(Channel.UNLIMITED)
private val isClosed = AtomicBoolean(false)
private val channelSize = AtomicLong(0L)

/**
* Run the upload using the provided block. This should only be used by the
Expand All @@ -73,6 +77,7 @@ class S3MultipartUpload<T : OutputStream>(
launch {
val uploadedParts = mutableListOf<CompletedPart>()
for (bytes in partQueue) {
channelSize.decrementAndGet()
val part = uploadPart(bytes, uploadedParts)
uploadedParts.add(part)
}
Expand Down Expand Up @@ -117,7 +122,11 @@ class S3MultipartUpload<T : OutputStream>(
wrappingBuffer.flush()
val bytes = underlyingBuffer.toByteArray()
underlyingBuffer.reset()
runBlocking { partQueue.send(bytes) }
channelSize.incrementAndGet()
val duration = measureTime { runBlocking { partQueue.send(bytes) } }
log.info {
"Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
}

private suspend fun uploadPart(
Expand All @@ -132,10 +141,13 @@ class S3MultipartUpload<T : OutputStream>(
body = ByteStream.fromBytes(bytes)
this.partNumber = partNumber
}
val uploadResponse = client.uploadPart(request)
val uploadResponse = measureTimedValue { client.uploadPart(request) }
log.info {
"Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})"
}
return CompletedPart {
this.partNumber = partNumber
this.eTag = uploadResponse.eTag
this.eTag = uploadResponse.value.eTag
}
}

Expand Down
16 changes: 8 additions & 8 deletions airbyte-integrations/connectors/destination-s3-v2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ application {
// Uncomment to run locally:
// '--add-opens', 'java.base/java.lang=ALL-UNNAMED'
// Uncomment to enable remote profiling:
// '-XX:NativeMemoryTracking=detail',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
'-XX:NativeMemoryTracking=detail',
'-Djava.rmi.server.hostname=localhost',
'-Dcom.sun.management.jmxremote=true',
'-Dcom.sun.management.jmxremote.port=6000',
'-Dcom.sun.management.jmxremote.rmi.port=6000',
'-Dcom.sun.management.jmxremote.local.only=false',
'-Dcom.sun.management.jmxremote.authenticate=false',
'-Dcom.sun.management.jmxremote.ssl=false'
]
}

Expand Down
35 changes: 24 additions & 11 deletions airbyte-integrations/connectors/destination-s3-v2/metadata.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
data:
connectorSubtype: file
connectorType: destination
definitionId: d6116991-e809-4c7c-ae09-c64712df5b66
dockerImageTag: 0.3.5
dockerRepository: airbyte/destination-s3-v2
githubIssueLabel: destination-s3-v2
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3 V2 Destination
name: S3
registryOverrides:
cloud:
enabled: false
enabled: true
oss:
enabled: false
releaseStage: alpha
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
message: >
**This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.**
upgradeDeadline: "2024-10-08"
resourceRequirements:
jobSpecific:
- jobType: sync
resourceRequirements:
memory_limit: 2Gi
memory_request: 2Gi
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
tags:
- language:java
ab_internal:
sl: 100
ql: 100
supportLevel: community
sl: 300
ql: 300
supportLevel: certified
supportsRefreshes: true
supportsFileTransfer: true
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.s3_v2

import com.fasterxml.jackson.annotation.JsonProperty
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.command.ConfigurationSpecification
Expand Down Expand Up @@ -81,6 +82,12 @@ class S3V2Specification :
//
// @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""")
// override val s3StagingPrefix: String? = null

@get:JsonProperty("num_process_records_workers")
val numProcessRecordsWorkers: Int? = 2

@get:JsonProperty("estimated_record_memory_overhead_ratio")
val estimatedRecordMemoryOverheadRatio: Double? = 5.0
}

@Singleton
Expand Down
24 changes: 12 additions & 12 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,18 +544,18 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. |
| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes |
| 0.1.15 | 2024-12-18 | [49879](https://github.com/airbytehq/airbyte/pull/49879) | Use a base image: airbyte/java-connector-base:1.0.0 |
| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging |
| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer |
| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests |
| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields |
| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams |
| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies |
| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead |
| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists |
| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb |
| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema |
| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. |
| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes |
| 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 2) for async S3 uploads. |
| 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 |
| 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |
Expand Down

0 comments on commit 0159fef

Please sign in to comment.