Skip to content

Commit

Permalink
Allow to mutate the s3 requests
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberdelia committed Dec 30, 2019
1 parent 309d607 commit 0ca2cdc
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/main/kotlin/com/lapanthere/signals/S3InputStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
* @param key Key of the object to download.
* @param parallelism The number of parts to download at a time.
* @param s3 The S3 client to be used during the download.
* @param mutator The function that mutates the request given to the S3 client.
*
*/
// TODO: Allow to pass others options.
class S3InputStream(
bucket: String,
key: String,
parallelism: Int = AVAILABLE_PROCESSORS,
s3: S3AsyncClient = S3AsyncClient.create()
s3: S3AsyncClient = S3AsyncClient.create(),
mutator: (GetObjectRequest.Builder) -> Unit = {}
) : InputStream() {
private val scope = CoroutineScope(Dispatchers.IO)
private val streams by lazy {
Expand All @@ -47,6 +48,7 @@ class S3InputStream(
scope.async(CoroutineName("chunk-${i + 1}"), CoroutineStart.LAZY) {
s3.getObject(
GetObjectRequest.builder()
.applyMutation(mutator)
.bucket(bucket)
.key(key)
.range("bytes=$begin-${begin + chunkSize - 1}")
Expand Down
8 changes: 4 additions & 4 deletions src/main/kotlin/com/lapanthere/signals/S3OutputStream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,25 @@ internal data class Part(
* @param key Object key for which the multipart upload is to be initiated.
* @param parallelism The number of parts to upload at a time.
* @param s3 The S3 client to be used during the upload.
* @param mutator The function that mutates the request given to the S3 client.
*
*/
// TODO: Allow for ACLs and others options.
class S3OutputStream(
private val bucket: String,
private val key: String,
parallelism: Int = AVAILABLE_PROCESSORS,
private val s3: S3AsyncClient = S3AsyncClient.create()
private val s3: S3AsyncClient = S3AsyncClient.create(),
mutator: (CreateMultipartUploadRequest.Builder) -> Unit = {}
) : OutputStream() {
private val scope = CoroutineScope(Dispatchers.IO)
private val semaphore = Semaphore(parallelism)

// Might be replaceable by a Flow, once parallel execution is supported.
private val parts = mutableListOf<Deferred<CompletedPart>>()
private val buffer = ByteArrayOutputStream(MIN_PART_SIZE.toInt())
private val digest = DigestOutputStream(buffer, MessageDigest.getInstance("MD5"))
private var size: Long = MIN_PART_SIZE
private val uploadID = s3.createMultipartUpload(
CreateMultipartUploadRequest.builder()
.applyMutation(mutator)
.bucket(bucket)
.key(key)
.build()
Expand Down

0 comments on commit 0ca2cdc

Please sign in to comment.