From 773bc45f446d032c64ed335ba7e192913e85f91d Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 15 Nov 2024 11:44:35 -0800 Subject: [PATCH] [WIP] Prerelease S3V2 Connector --- .../airbyte/cdk/load/message/MessageQueue.kt | 2 +- .../ObjectStorageStreamLoaderFactory.kt | 3 ++ .../cdk/load/file/s3/S3MultipartUpload.kt | 18 ++++++++-- .../connectors/destination-s3-v2/build.gradle | 16 ++++----- .../destination-s3-v2/metadata.yaml | 35 +++++++++++++------ .../src/main/kotlin/S3V2Specification.kt | 7 ++++ docs/integrations/destinations/s3.md | 24 ++++++------- 7 files changed, 70 insertions(+), 35 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt index e9ae6c33e39f..d74a0056a5cf 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/MessageQueue.kt @@ -23,7 +23,7 @@ interface QueueWriter : CloseableCoroutine { interface MessageQueue : QueueReader, QueueWriter abstract class ChannelMessageQueue : MessageQueue { - open val channel = Channel(Channel.UNLIMITED) + open val channel: Channel = Channel(Channel.UNLIMITED) override suspend fun publish(message: T) = channel.send(message) override suspend fun consume(): Flow = channel.receiveAsFlow() diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt index 5f362fcffa49..e1682dbe42ec 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectStorageStreamLoaderFactory.kt @@ -30,6 +30,9 @@ import java.io.File import java.io.OutputStream import java.nio.file.Path import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.take @Singleton @Secondary diff --git a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt index b12dab4b4c52..6cc75f27ac15 100644 --- a/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt +++ b/airbyte-cdk/bulk/toolkits/load-s3/src/main/kotlin/io/airbyte/cdk/load/file/s3/S3MultipartUpload.kt @@ -20,6 +20,9 @@ import io.github.oshai.kotlinlogging.KotlinLogging import java.io.ByteArrayOutputStream import java.io.OutputStream import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import kotlin.time.measureTime +import kotlin.time.measureTimedValue import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -51,6 +54,7 @@ class S3MultipartUpload( private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer) private val partQueue = Channel(Channel.UNLIMITED) private val isClosed = AtomicBoolean(false) + private val channelSize = AtomicLong(0L) /** * Run the upload using the provided block. This should only be used by the @@ -73,6 +77,7 @@ class S3MultipartUpload( launch { val uploadedParts = mutableListOf() for (bytes in partQueue) { + channelSize.decrementAndGet() val part = uploadPart(bytes, uploadedParts) uploadedParts.add(part) } @@ -117,7 +122,11 @@ class S3MultipartUpload( wrappingBuffer.flush() val bytes = underlyingBuffer.toByteArray() underlyingBuffer.reset() - runBlocking { partQueue.send(bytes) } + channelSize.incrementAndGet() + val duration = measureTime { runBlocking { partQueue.send(bytes) } } + log.info { + "Enqueued part in $duration (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } } private suspend fun uploadPart( @@ -132,10 +141,13 @@ class S3MultipartUpload( body = ByteStream.fromBytes(bytes) this.partNumber = partNumber } - val uploadResponse = client.uploadPart(request) + val uploadResponse = measureTimedValue { client.uploadPart(request) } + log.info { + "Uploaded part $partNumber in ${uploadResponse.duration} (channelSize = ${channelSize.get()}; uploadId = ${response.uploadId})" + } return CompletedPart { this.partNumber = partNumber - this.eTag = uploadResponse.eTag + this.eTag = uploadResponse.value.eTag } } diff --git a/airbyte-integrations/connectors/destination-s3-v2/build.gradle b/airbyte-integrations/connectors/destination-s3-v2/build.gradle index d370a2d14c0c..a48674517442 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/build.gradle +++ b/airbyte-integrations/connectors/destination-s3-v2/build.gradle @@ -17,14 +17,14 @@ application { // Uncomment to run locally: // '--add-opens', 'java.base/java.lang=ALL-UNNAMED' // Uncomment to enable remote profiling: -// '-XX:NativeMemoryTracking=detail', -// '-Djava.rmi.server.hostname=localhost', -// '-Dcom.sun.management.jmxremote=true', -// '-Dcom.sun.management.jmxremote.port=6000', -// '-Dcom.sun.management.jmxremote.rmi.port=6000', -// '-Dcom.sun.management.jmxremote.local.only=false', -// '-Dcom.sun.management.jmxremote.authenticate=false', -// '-Dcom.sun.management.jmxremote.ssl=false' + '-XX:NativeMemoryTracking=detail', + '-Djava.rmi.server.hostname=localhost', + '-Dcom.sun.management.jmxremote=true', + '-Dcom.sun.management.jmxremote.port=6000', + '-Dcom.sun.management.jmxremote.rmi.port=6000', + '-Dcom.sun.management.jmxremote.local.only=false', + '-Dcom.sun.management.jmxremote.authenticate=false', + '-Dcom.sun.management.jmxremote.ssl=false' ] } diff --git a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml index d4572456a265..e6619fdb6026 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-v2/metadata.yaml @@ -1,27 +1,40 @@ data: connectorSubtype: file connectorType: destination - definitionId: d6116991-e809-4c7c-ae09-c64712df5b66 - dockerImageTag: 0.3.5 - dockerRepository: airbyte/destination-s3-v2 - githubIssueLabel: destination-s3-v2 + definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 + dockerImageTag: 1.5.0 + dockerRepository: airbyte/destination-s3 + githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 - name: S3 V2 Destination + name: S3 registryOverrides: cloud: - enabled: false + enabled: true oss: - enabled: false - releaseStage: alpha + enabled: true + releaseStage: generally_available + releases: + breakingChanges: + 1.0.0: + message: > + **This release includes breaking changes, including major revisions to the schema of stored data. Do not upgrade without reviewing the migration guide.** + upgradeDeadline: "2024-10-08" + resourceRequirements: + jobSpecific: + - jobType: sync + resourceRequirements: + memory_limit: 2Gi + memory_request: 2Gi documentationUrl: https://docs.airbyte.com/integrations/destinations/s3 tags: - language:java ab_internal: - sl: 100 - ql: 100 - supportLevel: community + sl: 300 + ql: 300 + supportLevel: certified supportsRefreshes: true + supportsFileTransfer: true connectorTestSuitesOptions: - suite: unitTests - suite: integrationTests diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt index 536d5673d22e..8b216bbbfd0d 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/main/kotlin/S3V2Specification.kt @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.s3_v2 +import com.fasterxml.jackson.annotation.JsonProperty import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import io.airbyte.cdk.command.ConfigurationSpecification @@ -81,6 +82,12 @@ class S3V2Specification : // // @get:JsonSchemaInject(json = """{"examples":["__staging/data_sync/test"],"order":11}""") // override val s3StagingPrefix: String? = null + + @get:JsonProperty("num_process_records_workers") + val numProcessRecordsWorkers: Int? = 2 + + @get:JsonProperty("estimated_record_memory_overhead_ratio") + val estimatedRecordMemoryOverheadRatio: Double? = 5.0 } @Singleton diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 1cc5220b6a28..ebf35fe2dd3a 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,18 +544,18 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | -| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | -| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | -| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | -| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies | -| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead | -| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists | -| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | -| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema | -| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. | -| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes | -| 0.1.15 | 2024-12-18 | [49879](https://github.com/airbytehq/airbyte/pull/49879) | Use a base image: airbyte/java-connector-base:1.0.0 | +| 1.5.0 | 2024-11-08 | []() | Migrate to Bulk Load CDK; adds opt-in support for staging | +| 1.4.0 | 2024-10-23 | [46302](https://github.com/airbytehq/airbyte/pull/46302) | add support for file transfer | +| 1.3.0 | 2024-09-30 | [46281](https://github.com/airbytehq/airbyte/pull/46281) | fix tests | +| 1.2.1 | 2024-09-20 | [45700](https://github.com/airbytehq/airbyte/pull/45700) | Improve resiliency to jsonschema fields | +| 1.2.0 | 2024-09-18 | [45402](https://github.com/airbytehq/airbyte/pull/45402) | fix exception with columnless streams | +| 1.1.0 | 2024-09-18 | [45436](https://github.com/airbytehq/airbyte/pull/45436) | upgrade all dependencies | +| 1.0.5 | 2024-09-05 | [45143](https://github.com/airbytehq/airbyte/pull/45143) | don't overwrite (and delete) existing files, skip indexes instead | +| 1.0.4 | 2024-08-30 | [44933](https://github.com/airbytehq/airbyte/pull/44933) | Fix: Avro/Parquet: handle empty schemas in nested objects/lists | +| 1.0.3 | 2024-08-20 | [44476](https://github.com/airbytehq/airbyte/pull/44476) | Increase message parsing limit to 100mb | +| 1.0.2 | 2024-08-19 | [44401](https://github.com/airbytehq/airbyte/pull/44401) | Fix: S3 Avro/Parquet: handle nullable top-level schema | +| 1.0.1 | 2024-08-14 | [42579](https://github.com/airbytehq/airbyte/pull/42579) | OVERWRITE MODE: Deletes deferred until successful sync. | +| 1.0.0 | 2024-08-08 | [42409](https://github.com/airbytehq/airbyte/pull/42409) | Major breaking changes: new destination schema, change capture, Avro/Parquet improvements, bugfixes | | 0.6.7 | 2024-08-11 | [43713](https://github.com/airbytehq/airbyte/issues/43713) | Decreased memory ratio (0.7 -> 0.5) and thread allocation (5 -> 2) for async S3 uploads. | | 0.6.6 | 2024-08-06 | [43343](https://github.com/airbytehq/airbyte/pull/43343) | Use Kotlin 2.0.0 | | 0.6.5 | 2024-08-01 | [42405](https://github.com/airbytehq/airbyte/pull/42405) | S3 parallelizes workloads, checkpoints, submits counts, support for generationId in metadata for refreshes. |