From 65de9bcdd8e031bf8c2e43ada1117d604e578f6d Mon Sep 17 00:00:00 2001 From: Gustav Grusell Date: Mon, 24 Jun 2024 22:15:33 +0200 Subject: [PATCH] feat: support s3 urls for input and output This commit add support for using s3 urls on the format s3:/// in both input and output. If ans s3 URL is used as input, a presigned URL is created and used as input to ffmpeg. The duration of the presigned URLs can be controlled with the 'remote-files.s3.presignDurationSeconds' config property. If an s3 URL is used for 'outputFolder', output will first be stored locally and then uploaded to s3 once transcoding is finished. Aws credentials are read with DefaultCredentialsProvider, meaning aws credentials can be provided in a number of ways, see https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/auth/credentials/DefaultCredentialsProvider.html; Not that when using s3 urls for input, the presigned URLs will be shown in the logs. If this is not desirable, setting logging.config (or env variable LOGGING_CONFIG) to 'classpath:logback-json-mask-s3-presign.xml' will use a log config that masks the presign query parameters. By setting env variable REMOTEFILES_S3_ANONYMOUSACCESS to true, s3 urls will be accessed in anonymous mode, corresponding to using the '--no-sign-request' flag with the aws cli. Any s3 access key or secrets key configured will be ignored. Multipart upload will be disabled in this case since the s3 sdk does not support multipart upload when using anonymous access. Signed-off-by: Gustav Grusell --- checks.gradle | 2 + encore-common/build.gradle.kts | 3 + .../oss/encore/S3RemoteFilesConfiguration.kt | 81 +++++++++++++ .../se/svt/oss/encore/model/input/Input.kt | 13 ++- .../svt/oss/encore/service/EncoreService.kt | 8 +- .../service/localencode/LocalEncodeService.kt | 28 ++++- .../mediaanalyzer/MediaAnalyzerService.kt | 31 +++-- .../service/remotefiles/RemoteFileHandler.kt | 11 ++ .../service/remotefiles/RemoteFileService.kt | 57 ++++++++++ .../service/remotefiles/s3/S3Properties.kt | 17 +++ .../remotefiles/s3/S3RemoteFileHandler.kt | 66 +++++++++++ .../service/remotefiles/s3/S3UriConverter.kt | 38 +++++++ .../oss/encore/EncoreIntegrationTestBase.kt | 29 +++-- .../svt/oss/encore/EncoreS3IntegrationTest.kt | 107 ++++++++++++++++++ .../service/remotefiles/S3UriConverterTest.kt | 70 ++++++++++++ .../test/resources/application-test-s3.yml | 3 + .../se/svt/oss/encore/RedisExtension.kt | 11 -- .../se/svt/oss/encore/S3StorageExtension.kt | 32 ++++++ .../se/svt/oss/encore/TestFixtureUtils.kt | 16 +++ .../logback-json-mask-s3-presign.xml | 18 +++ .../logback-json-mask-s3-presign.xml | 18 +++ 21 files changed, 619 insertions(+), 40 deletions(-) create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt create mode 100644 encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3UriConverter.kt create mode 100644 encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt create mode 100644 encore-common/src/test/kotlin/se/svt/oss/encore/service/remotefiles/S3UriConverterTest.kt create mode 100644 encore-common/src/test/resources/application-test-s3.yml create mode 100644 encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt create mode 100644 encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt create mode 100644 encore-web/src/main/resources/logback-json-mask-s3-presign.xml create mode 100644 encore-worker/src/main/resources/logback-json-mask-s3-presign.xml diff --git a/checks.gradle b/checks.gradle index 64012bd..7ab55e9 100644 --- a/checks.gradle +++ b/checks.gradle @@ -11,6 +11,8 @@ jacocoTestCoverageVerification { '*.static {...}', '*.model.*.get*', '*.service.localencode.LocalEncodeService.moveFile*', + '*.S3Properties*.get*()', + '*RemoteFileService.DefaultHandler.*', '*QueueService.getQueue*', '*QueueService.migrateQueues()', '*.ShutdownHandler.*', diff --git a/encore-common/build.gradle.kts b/encore-common/build.gradle.kts index 283850d..8086df9 100644 --- a/encore-common/build.gradle.kts +++ b/encore-common/build.gradle.kts @@ -16,6 +16,8 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-slf4j") + implementation(platform("software.amazon.awssdk:bom:2.29.2")) + implementation("software.amazon.awssdk:s3") testImplementation(project(":encore-web")) testImplementation("org.springframework.security:spring-security-test") @@ -26,6 +28,7 @@ dependencies { testFixturesImplementation("com.redis:testcontainers-redis:2.2.4") testFixturesImplementation("io.github.microutils:kotlin-logging:3.0.5") testFixturesImplementation("org.junit.jupiter:junit-jupiter-api") + testFixturesImplementation("org.testcontainers:localstack:1.20.3") testFixturesRuntimeOnly("org.junit.platform:junit-platform-launcher") } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt new file mode 100644 index 0000000..41118d1 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/S3RemoteFilesConfiguration.kt @@ -0,0 +1,81 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import se.svt.oss.encore.service.remotefiles.s3.S3Properties +import se.svt.oss.encore.service.remotefiles.s3.S3RemoteFileHandler +import se.svt.oss.encore.service.remotefiles.s3.S3UriConverter +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.S3Configuration +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import java.net.URI + +@ConditionalOnProperty("remote-files.s3.enabled", havingValue = "true") +@EnableConfigurationProperties(S3Properties::class) +@Configuration +class S3RemoteFilesConfiguration { + + @Bean + fun s3Region() = + Region.of(System.getProperty("aws.region") ?: System.getenv("AWS_REGION") ?: "us-east-1") + + @Bean + fun s3Client(s3Region: Region, s3Properties: S3Properties) = S3AsyncClient.builder() + .region(s3Region) + .crossRegionAccessEnabled(true) + .multipartEnabled(!s3Properties.anonymousAccess) // Multipart upload requires credentials + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(), + ) + .credentialsProvider( + if (s3Properties.anonymousAccess) { + AnonymousCredentialsProvider.create() + } else { + DefaultCredentialsProvider.create() + }, + ) + .apply { + if (!s3Properties.endpoint.isNullOrBlank()) { + endpointOverride(URI.create(s3Properties.endpoint)) + } + } + .build() + + @Bean + fun s3Presigner(s3Region: Region, s3Properties: S3Properties) = S3Presigner.builder() + .region(s3Region) + .serviceConfiguration( + S3Configuration.builder() + .pathStyleAccessEnabled(true) + .build(), + ) + .apply { + if (!s3Properties.endpoint.isNullOrBlank()) { + endpointOverride(URI.create(s3Properties.endpoint)) + } + } + .build() + + @Bean + fun s3UriConverter(s3Properties: S3Properties, s3Region: Region) = S3UriConverter(s3Properties, s3Region) + + @Bean + fun s3RemoteFileHandler( + s3Client: S3AsyncClient, + s3Presigner: S3Presigner, + s3Properties: S3Properties, + s3UriConverter: S3UriConverter, + ) = + S3RemoteFileHandler(s3Client, s3Presigner, s3Properties, s3UriConverter) +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt index bc99b60..53c2ed5 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/model/input/Input.kt @@ -35,6 +35,8 @@ sealed interface Input { @get:Schema(description = "URI of input file", required = true, example = "/path/to/file.mp4") val uri: String + var accessUri: String + @get:Schema(description = "Input params required to properly decode input", example = """{ "ac": "2" }""") val params: LinkedHashMap @@ -167,6 +169,9 @@ data class AudioInput( override val type: String get() = TYPE_AUDIO + @JsonIgnore + override var accessUri: String = uri + override fun withSeekTo(seekTo: Double) = copy(seekTo = seekTo) val duration: Double @@ -188,6 +193,9 @@ data class VideoInput( override val seekTo: Double? = null, override val copyTs: Boolean = false, ) : VideoIn { + @JsonIgnore + override var accessUri: String = uri + override val analyzedVideo: VideoFile @JsonIgnore get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed video for $uri is ${analyzed?.type}") @@ -221,6 +229,9 @@ data class AudioVideoInput( override val copyTs: Boolean = false, ) : VideoIn, AudioIn { + @JsonIgnore + override var accessUri: String = uri + override val analyzedVideo: VideoFile @JsonIgnore get() = analyzed as? VideoFile ?: throw RuntimeException("Analyzed audio/video for $uri is ${analyzed?.type}") @@ -245,7 +256,7 @@ fun List.inputParams(readDuration: Double?): List = (readDuration?.let { listOf("-t", "$it") } ?: emptyList()) + (input.seekTo?.let { listOf("-ss", "$it") } ?: emptyList()) + (if (input.copyTs) listOf("-copyts") else emptyList()) + - listOf("-i", input.uri) + listOf("-i", input.accessUri) } fun List.maxDuration(): Double? = maxOfOrNull { diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt index 5e5f011..345662f 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/EncoreService.kt @@ -45,6 +45,7 @@ import se.svt.oss.encore.service.callback.CallbackService import se.svt.oss.encore.service.localencode.LocalEncodeService import se.svt.oss.encore.service.mediaanalyzer.MediaAnalyzerService import se.svt.oss.encore.service.queue.QueueService +import se.svt.oss.encore.service.remotefiles.RemoteFileService import se.svt.oss.mediaanalyzer.file.MediaContainer import se.svt.oss.mediaanalyzer.file.MediaFile import java.io.File @@ -67,6 +68,7 @@ class EncoreService( private val localEncodeService: LocalEncodeService, private val encoreProperties: EncoreProperties, private val queueService: QueueService, + private val remoteFileService: RemoteFileService, ) { private val cancelTopicName = "cancel" @@ -227,7 +229,7 @@ class EncoreService( callbackService.sendProgressCallback(encoreJob) } finally { redisMessageListerenerContainer.removeMessageListener(cancelListener) - localEncodeService.cleanup(outputFolder) + localEncodeService.cleanup(outputFolder, encoreJob) } } @@ -270,6 +272,10 @@ class EncoreService( } private fun initJob(encoreJob: EncoreJob) { + encoreJob.inputs.forEach { input -> + input.accessUri = remoteFileService.getAccessUri(input.uri) + } + encoreJob.inputs.forEach { input -> mediaAnalyzerService.analyzeInput(input) } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt index cf9e66f..8c3a3d1 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/localencode/LocalEncodeService.kt @@ -9,11 +9,13 @@ import org.springframework.stereotype.Service import se.svt.oss.encore.config.EncoreProperties import se.svt.oss.encore.model.EncoreJob import se.svt.oss.encore.process.createTempDir +import se.svt.oss.encore.service.remotefiles.RemoteFileService import se.svt.oss.mediaanalyzer.file.AudioFile import se.svt.oss.mediaanalyzer.file.ImageFile import se.svt.oss.mediaanalyzer.file.MediaFile import se.svt.oss.mediaanalyzer.file.VideoFile import java.io.File +import java.net.URI import java.nio.file.Files import java.nio.file.Path import java.nio.file.StandardCopyOption @@ -23,11 +25,12 @@ private val log = KotlinLogging.logger {} @Service class LocalEncodeService( private val encoreProperties: EncoreProperties, + private val remoteFileService: RemoteFileService, ) { fun outputFolder( encoreJob: EncoreJob, - ): String = if (encoreProperties.localTemporaryEncode) { + ): String = if (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) { createTempDir("job_${encoreJob.id}").toString() } else { encoreJob.outputFolder @@ -38,6 +41,23 @@ class LocalEncodeService( output: List, encoreJob: EncoreJob, ): List { + if (remoteFileService.isRemoteFile(encoreJob.outputFolder)) { + log.debug { "Moving files to output destination ${encoreJob.outputFolder}, from local temp $outputFolder" } + File(outputFolder).listFiles()?.forEach { localFile -> + val remoteFile = URI.create(encoreJob.outputFolder).resolve(localFile.name).toString() + remoteFileService.upload(localFile.toString(), remoteFile) + } + val files = output.map { + val resolvedPath = URI.create(encoreJob.outputFolder).resolve(Path.of(it.file).fileName.toString()).toString() + when (it) { + is VideoFile -> it.copy(file = resolvedPath) + is AudioFile -> it.copy(file = resolvedPath) + is ImageFile -> it.copy(file = resolvedPath) + else -> throw Exception("Invalid conversion") + } + } + return files + } if (encoreProperties.localTemporaryEncode) { val destination = File(encoreJob.outputFolder) log.debug { "Moving files to correct outputFolder ${encoreJob.outputFolder}, from local temp $outputFolder" } @@ -50,8 +70,10 @@ class LocalEncodeService( return output } - fun cleanup(tempDirectory: String?) { - if (tempDirectory != null && encoreProperties.localTemporaryEncode) { + fun cleanup(tempDirectory: String?, encoreJob: EncoreJob) { + if (tempDirectory != null && + (encoreProperties.localTemporaryEncode || remoteFileService.isRemoteFile(encoreJob.outputFolder)) + ) { File(tempDirectory).deleteRecursively() } } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt index c52aef9..a420504 100644 --- a/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/mediaanalyzer/MediaAnalyzerService.kt @@ -22,6 +22,8 @@ import se.svt.oss.mediaanalyzer.ffprobe.SideData import se.svt.oss.mediaanalyzer.ffprobe.UnknownSideData import se.svt.oss.mediaanalyzer.ffprobe.UnknownStream import se.svt.oss.mediaanalyzer.file.AudioFile +import se.svt.oss.mediaanalyzer.file.ImageFile +import se.svt.oss.mediaanalyzer.file.SubtitleFile import se.svt.oss.mediaanalyzer.file.VideoFile import se.svt.oss.mediaanalyzer.mediainfo.AudioTrack import se.svt.oss.mediaanalyzer.mediainfo.GeneralTrack @@ -58,20 +60,25 @@ class MediaAnalyzerService(private val mediaAnalyzer: MediaAnalyzer) { val useFirstAudioStreams = (input as? AudioIn)?.channelLayout?.channels?.size input.analyzed = mediaAnalyzer.analyze( - file = input.uri, + file = input.accessUri, probeInterlaced = probeInterlaced, ffprobeInputParams = input.params, - ).let { - val selectedVideoStream = (input as? VideoIn)?.videoStream - val selectedAudioStream = (input as? AudioIn)?.audioStream - when (it) { - is VideoFile -> it.selectVideoStream(selectedVideoStream) - .selectAudioStream(selectedAudioStream) - .trimAudio(useFirstAudioStreams) - is AudioFile -> it.selectAudioStream(selectedAudioStream) - .trimAudio(useFirstAudioStreams) - else -> it + ) + .let { + val selectedVideoStream = (input as? VideoIn)?.videoStream + val selectedAudioStream = (input as? AudioIn)?.audioStream + when (it) { + is VideoFile -> it.selectVideoStream(selectedVideoStream) + .selectAudioStream(selectedAudioStream) + .trimAudio(useFirstAudioStreams) + .copy(file = input.uri) + is AudioFile -> it.selectAudioStream(selectedAudioStream) + .trimAudio(useFirstAudioStreams) + .copy(file = input.uri) + is ImageFile -> it.copy(file = input.uri) + is SubtitleFile -> it.copy(file = input.uri) + else -> it + } } - } } } diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt new file mode 100644 index 0000000..6fd3956 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileHandler.kt @@ -0,0 +1,11 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles + +interface RemoteFileHandler { + fun getAccessUri(uri: String): String + fun upload(localFile: String, remoteFile: String) + val protocols: List +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt new file mode 100644 index 0000000..fab3126 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/RemoteFileService.kt @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles + +import io.github.oshai.kotlinlogging.KotlinLogging +import org.springframework.stereotype.Service +import java.net.URI + +private val log = KotlinLogging.logger {} + +@Service +class RemoteFileService(private val remoteFileHandlers: List) { + + private val defaultHandler = DefaultHandler() + + fun isRemoteFile(uriOrPath: String): Boolean { + val uri = URI.create(uriOrPath) + return !(uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") + } + + fun getAccessUri(uriOrPath: String): String { + val uri = URI.create(uriOrPath) + return getHandler(uri).getAccessUri(uriOrPath) + } + + fun upload(localFile: String, remoteFile: String) { + val uri = URI.create(remoteFile) + getHandler(uri).upload(localFile, remoteFile) + } + + private fun getHandler(uri: URI): RemoteFileHandler { + log.info { "Getting handler for uri $uri. Available protocols: ${remoteFileHandlers.flatMap {it.protocols} }" } + if (uri.scheme.isNullOrEmpty() || uri.scheme.lowercase() == "file") { + return defaultHandler + } + val handler = remoteFileHandlers.firstOrNull { it.protocols.contains(uri.scheme) } + if (handler != null) { + return handler + } + log.info { "No remote file handler found for protocol ${uri.scheme}. Using default handler." } + return defaultHandler + } + + /** Handler user for protocols where no specific handler is defined. Works for local files and + * any protocols that ffmpeg supports natively */ + private class DefaultHandler : RemoteFileHandler { + override fun getAccessUri(uri: String): String = uri + + override fun upload(localFile: String, remoteFile: String) { + // Do nothing + } + + override val protocols: List = emptyList() + } +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt new file mode 100644 index 0000000..48ff008 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3Properties.kt @@ -0,0 +1,17 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles.s3 + +import org.springframework.boot.context.properties.ConfigurationProperties +import java.time.Duration + +@ConfigurationProperties("remote-files.s3") +data class S3Properties( + val enabled: Boolean = false, + val anonymousAccess: Boolean = false, + val endpoint: String = "", + val presignDurationSeconds: Long = Duration.ofHours(12).seconds, + val uploadTimeoutSeconds: Long = Duration.ofHours(1).seconds, +) diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt new file mode 100644 index 0000000..c485bbe --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3RemoteFileHandler.kt @@ -0,0 +1,66 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles.s3 + +import io.github.oshai.kotlinlogging.KotlinLogging +import se.svt.oss.encore.service.remotefiles.RemoteFileHandler +import software.amazon.awssdk.services.s3.S3AsyncClient +import software.amazon.awssdk.services.s3.model.GetObjectRequest +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import software.amazon.awssdk.services.s3.presigner.S3Presigner +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest +import java.net.URI +import java.nio.file.Paths +import java.util.concurrent.TimeUnit + +private val log = KotlinLogging.logger {} + +class S3RemoteFileHandler( + private val client: S3AsyncClient, + private val presigner: S3Presigner, + private val s3Properties: S3Properties, + private val s3UriConverter: S3UriConverter, +) : RemoteFileHandler { + + override fun getAccessUri(uri: String): String { + val s3Uri = URI.create(uri) + + if (s3Properties.anonymousAccess) { + return s3UriConverter.toHttp(s3Uri) + } + return presignUrl(s3Uri) + } + + private fun presignUrl(s3Uri: URI): String { + val (bucket, key) = s3UriConverter.getBucketAndKey(s3Uri) + + val objectRequest: GetObjectRequest = GetObjectRequest.builder() + .bucket(bucket) + .key(key) + .build() + val presignRequest: GetObjectPresignRequest = GetObjectPresignRequest.builder() + .signatureDuration(java.time.Duration.ofSeconds(s3Properties.presignDurationSeconds)) + .getObjectRequest(objectRequest) + .build() + + val presignedRequest = presigner.presignGetObject(presignRequest) + val url = presignedRequest.url().toExternalForm() + return url + } + + override fun upload(localFile: String, remoteFile: String) { + log.info { "Uploading $localFile to $remoteFile" } + val s3Uri = URI.create(remoteFile) + val (bucket, key) = s3UriConverter.getBucketAndKey(s3Uri) + val putObjectRequest: PutObjectRequest = PutObjectRequest.builder() + .bucket(bucket) + .key(key) + .build() + val res = client.putObject(putObjectRequest, Paths.get(localFile)).get(s3Properties.presignDurationSeconds, TimeUnit.SECONDS) + log.info { "Upload result: $res" } + } + + override val protocols = listOf("s3") +} diff --git a/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3UriConverter.kt b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3UriConverter.kt new file mode 100644 index 0000000..b675522 --- /dev/null +++ b/encore-common/src/main/kotlin/se/svt/oss/encore/service/remotefiles/s3/S3UriConverter.kt @@ -0,0 +1,38 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles.s3 + +import software.amazon.awssdk.regions.Region +import java.net.URI + +class S3UriConverter( + private val s3Properties: S3Properties, + private val region: Region, +) { + + fun toHttp(s3Uri: URI): String { + if (s3Uri.scheme != "s3") { + throw IllegalArgumentException("Invalid URI: $s3Uri") + } + val bucket = s3Uri.host + val key = s3Uri.path.stripLeadingSlash() + + if (s3Properties.endpoint.isNotBlank()) { + return "https://$bucket.${s3Properties.endpoint}/$key" + } + return "https://$bucket.s3.$region.amazonaws.com/$key" + } + + fun getBucketAndKey(s3Uri: URI): Pair { + if (s3Uri.scheme != "s3") { + throw IllegalArgumentException("Invalid URI: $s3Uri") + } + val bucket = s3Uri.host + val key = s3Uri.path.stripLeadingSlash() + return Pair(bucket, key) + } + + private fun String.stripLeadingSlash() = if (startsWith("/")) substring(1) else this +} diff --git a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt index 4ad1919..3376223 100644 --- a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt +++ b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreIntegrationTestBase.kt @@ -143,18 +143,23 @@ class EncoreIntegrationTestBase(val wireMockRuntimeInfo: WireMockRuntimeInfo) { logContext = mapOf("FlowId" to UUID.randomUUID().toString()), ) - fun defaultExpectedOutputFiles(outputDir: File, testFile: Resource): List = listOf( - expectedFile(outputDir, testFile, "x264_3100.mp4"), - expectedFile(outputDir, testFile, "x264_2069.mp4"), - expectedFile(outputDir, testFile, "x264_1312.mp4"), - expectedFile(outputDir, testFile, "x264_806.mp4"), - expectedFile(outputDir, testFile, "x264_324.mp4"), - expectedFile(outputDir, testFile, "STEREO.mp4"), - expectedFile(outputDir, testFile, "thumb01.jpg"), - expectedFile(outputDir, testFile, "thumb02.jpg"), - expectedFile(outputDir, testFile, "thumb03.jpg"), - expectedFile(outputDir, testFile, "12x20_160x90_thumbnail_map.jpg"), - ) + fun defaultExpectedOutputFileSuffixes() = + listOf( + "x264_3100.mp4", + "x264_2069.mp4", + "x264_1312.mp4", + "x264_806.mp4", + "x264_324.mp4", + "STEREO.mp4", + "thumb01.jpg", + "thumb02.jpg", + "thumb03.jpg", + "12x20_160x90_thumbnail_map.jpg", + ) + + fun defaultExpectedOutputFiles(outputDir: File, testFile: Resource): List = defaultExpectedOutputFileSuffixes().map { + expectedFile(outputDir, testFile, it) + } fun expectedFile(outputDir: File, baseName: String, suffix: String) = "${outputDir.absolutePath}/${baseName}_$suffix" diff --git a/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt new file mode 100644 index 0000000..29d47b1 --- /dev/null +++ b/encore-common/src/test/kotlin/se/svt/oss/encore/EncoreS3IntegrationTest.kt @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: 2020 Sveriges Television AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore + +import com.fasterxml.jackson.module.kotlin.readValue +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo +import com.github.tomakehurst.wiremock.junit5.WireMockTest +import mu.KotlinLogging +import org.awaitility.Durations +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.io.TempDir +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.test.context.ActiveProfiles +import se.svt.oss.encore.Assertions.assertThat +import se.svt.oss.encore.model.Status +import se.svt.oss.encore.model.callback.JobProgress +import se.svt.oss.encore.model.input.AudioVideoInput +import software.amazon.awssdk.services.s3.S3AsyncClient +import java.io.File +import java.nio.file.Paths + +@ExtendWith(S3StorageExtension::class) +@ActiveProfiles(profiles = ["test-local", "test-s3"]) +@WireMockTest +class EncoreS3IntegrationTest(wireMockRuntimeInfo: WireMockRuntimeInfo) : EncoreIntegrationTestBase(wireMockRuntimeInfo) { + private val log = KotlinLogging.logger {} + + @Autowired + lateinit var s3Client: S3AsyncClient + + val inputBucket = "input-bucket" + val outputBucket = "output-bucket" + + @BeforeEach + override fun setUp() { + super.setUp() + + listOf(inputBucket, outputBucket).forEach { bucket -> + s3Client.createBucket { it.bucket(bucket) } + .get() + } + } + + @AfterEach + fun tearDown() { + listOf(inputBucket, outputBucket).forEach { bucket -> + s3Client.listObjects { it.bucket(bucket) } + .get() + .contents() + .forEach { obj -> + s3Client.deleteObject { it.bucket(bucket).key(obj.key()) } + .get() + } + s3Client.deleteBucket { it.bucket(bucket) } + .get() + } + } + + @Test + fun jobWiths3InputAndOutputIsSuccessful(@TempDir outputDir: File) { + val filename = "test.mp4" + val remoteInput = uploadInputfile(testFileSurround.file.absolutePath, filename) + + val job = job(outputDir = outputDir, file = testFileSurround) + .copy( + outputFolder = "s3://$outputBucket/output/", + inputs = listOf(AudioVideoInput(uri = remoteInput)), + ) + + val createdJob = createAndAwaitJob( + job = job, + timeout = Durations.FIVE_MINUTES, + ) { it.status.isCompleted } + + assertThat(createdJob).hasStatus(Status.SUCCESSFUL) + + val progressCalls = wireMockRuntimeInfo + .wireMock + .serveEvents.map { objectMapper.readValue(it.request.bodyAsString) } + assertThat(progressCalls.first()) + .hasStatus(Status.SUCCESSFUL) + + val expectedFiles = (defaultExpectedOutputFileSuffixes() + listOf("SURROUND.mp4")) + .map { "output/${createdJob.baseName}_$it" } + + val actualFiles = s3Client.listObjectsV2 { + it.bucket(outputBucket) + .prefix("output/") + } + .get() + .contents() + .map { it.key() ?: "" } + assertThat(actualFiles).containsExactlyInAnyOrder(*expectedFiles.toTypedArray()) + // expectedFiles.forEach { minioClient.statObject(StatObjectArgs.builder().bucket(outputBucket).`object`(it).build()) } + } + + private fun uploadInputfile(localPath: String, key: String): String { + s3Client.putObject({ it.bucket(inputBucket).key(key).build() }, Paths.get(localPath)) + + return "s3://$inputBucket/$key" + } +} diff --git a/encore-common/src/test/kotlin/se/svt/oss/encore/service/remotefiles/S3UriConverterTest.kt b/encore-common/src/test/kotlin/se/svt/oss/encore/service/remotefiles/S3UriConverterTest.kt new file mode 100644 index 0000000..c67b9c9 --- /dev/null +++ b/encore-common/src/test/kotlin/se/svt/oss/encore/service/remotefiles/S3UriConverterTest.kt @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore.service.remotefiles + +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test +import se.svt.oss.encore.service.remotefiles.s3.S3Properties +import se.svt.oss.encore.service.remotefiles.s3.S3UriConverter +import software.amazon.awssdk.regions.Region +import java.net.URI + +class S3UriConverterTest { + + private val s3Properties = S3Properties(enabled = true, anonymousAccess = true) + private val region = Region.of("eu-west-1") + private val s3Uri = URI.create("s3://my-bucket/test2/test1_x264_3100.mp4") + private val s3UriConverter = S3UriConverter(s3Properties, region) + + @Test + fun toHttpReturnsCorrectUri() { + val httpUri = s3UriConverter.toHttp(s3Uri) + assertThat(httpUri) + .isEqualTo("https://my-bucket.s3.eu-west-1.amazonaws.com/test2/test1_x264_3100.mp4") + } + + @Test + fun differentRegionReturnsCorrectUri() { + val s3UriConverter = S3UriConverter(s3Properties, Region.of("eu-north-1")) + + val httpUri = s3UriConverter.toHttp(s3Uri) + assertThat(httpUri) + .isEqualTo("https://my-bucket.s3.eu-north-1.amazonaws.com/test2/test1_x264_3100.mp4") + } + + @Test + fun toHttpWithCustomEndpointReturnsCorrectUri() { + val endpoint = "some-host:1234" + val s3UriConverter = S3UriConverter(s3Properties.copy(endpoint = endpoint), region) + + val httpUri = s3UriConverter.toHttp(s3Uri) + assertThat(httpUri) + .isEqualTo("https://my-bucket.some-host:1234/test2/test1_x264_3100.mp4") + } + + @Test + fun getBucketAndKeyReturnsCorrectValues() { + val (bucket, key) = s3UriConverter.getBucketAndKey(s3Uri) + assertThat(bucket).isEqualTo("my-bucket") + assertThat(key).isEqualTo("test2/test1_x264_3100.mp4") + } + + @Test + fun toHttpNonS3UriThrowsException() { + val uri = URI.create("https://my-bucket/test2/test1_x264_3100.mp4") + assertThatThrownBy { s3UriConverter.toHttp(uri) } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessage("Invalid URI: $uri") + } + + @Test + fun getBucketAndKeyNonS3UriThrowsException() { + val uri = URI.create("https://my-bucket/test2/test1_x264_3100.mp4") + assertThatThrownBy { s3UriConverter.getBucketAndKey(uri) } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessage("Invalid URI: $uri") + } +} diff --git a/encore-common/src/test/resources/application-test-s3.yml b/encore-common/src/test/resources/application-test-s3.yml new file mode 100644 index 0000000..983b8e7 --- /dev/null +++ b/encore-common/src/test/resources/application-test-s3.yml @@ -0,0 +1,3 @@ +remote-files: + s3: + enabled: true \ No newline at end of file diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt index f46ee17..9bf99ae 100644 --- a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/RedisExtension.kt @@ -4,7 +4,6 @@ import com.redis.testcontainers.RedisContainer import mu.KotlinLogging import org.junit.jupiter.api.extension.BeforeAllCallback import org.junit.jupiter.api.extension.ExtensionContext -import org.testcontainers.DockerClientFactory import org.testcontainers.utility.DockerImageName private const val DEFAULT_REDIS_DOCKER_IMAGE = "redis:6.2.13" @@ -25,14 +24,4 @@ class RedisExtension : BeforeAllCallback { System.setProperty("spring.data.redis.port", port) } } - - private fun isDockerAvailable(): Boolean = try { - log.info { "Checking for docker..." } - DockerClientFactory.instance().client() - log.info { "Docker is available" } - true - } catch (ex: Throwable) { - log.warn { "Docker is not available! Make sure redis is available as configured by spring.data.redis (default localhost:6379)" } - false - } } diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt new file mode 100644 index 0000000..dec3e43 --- /dev/null +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/S3StorageExtension.kt @@ -0,0 +1,32 @@ +// SPDX-FileCopyrightText: 2024 Eyevinn Technology AB +// +// SPDX-License-Identifier: EUPL-1.2 + +package se.svt.oss.encore + +import mu.KotlinLogging +import org.junit.jupiter.api.extension.BeforeAllCallback +import org.junit.jupiter.api.extension.ExtensionContext +import org.testcontainers.containers.localstack.LocalStackContainer +import org.testcontainers.utility.DockerImageName + +class S3StorageExtension : BeforeAllCallback { + private val log = KotlinLogging.logger { } + override fun beforeAll(context: ExtensionContext?) { + if (!isDockerAvailable()) { + log.warn { "Docker is not available! Make sure minio is available as configured by remote-files.s3.*" } + return + } + val localstackImage = DockerImageName.parse("localstack/localstack:3.5.0") + + val localstack: LocalStackContainer = LocalStackContainer(localstackImage) + .withServices(LocalStackContainer.Service.S3) + localstack.start() + + log.info { "localstack endpoint: ${localstack.endpoint}" } + + System.setProperty("aws.accessKeyId", localstack.accessKey) + System.setProperty("aws.secretAccessKey", localstack.secretKey) + System.setProperty("remote-files.s3.endpoint", localstack.endpoint.toString()) + } +} diff --git a/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt new file mode 100644 index 0000000..77cf6cf --- /dev/null +++ b/encore-common/src/testFixtures/kotlin/se/svt/oss/encore/TestFixtureUtils.kt @@ -0,0 +1,16 @@ +package se.svt.oss.encore + +import mu.KotlinLogging +import org.testcontainers.DockerClientFactory + +private val log = KotlinLogging.logger { } + +fun isDockerAvailable(): Boolean = try { + log.info { "Checking for docker..." } + DockerClientFactory.instance().client() + log.info { "Docker is available" } + true +} catch (ex: Throwable) { + log.warn { "Docker is not available! Make sure redis is available as configured by spring.data.redis (default localhost:6379)" } + false +} diff --git a/encore-web/src/main/resources/logback-json-mask-s3-presign.xml b/encore-web/src/main/resources/logback-json-mask-s3-presign.xml new file mode 100644 index 0000000..6f1292c --- /dev/null +++ b/encore-web/src/main/resources/logback-json-mask-s3-presign.xml @@ -0,0 +1,18 @@ + + + + + + + (X-Amz-[^=]+)=[^&]* + $1=*** + + + + + + + + + + diff --git a/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml b/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml new file mode 100644 index 0000000..6f1292c --- /dev/null +++ b/encore-worker/src/main/resources/logback-json-mask-s3-presign.xml @@ -0,0 +1,18 @@ + + + + + + + (X-Amz-[^=]+)=[^&]* + $1=*** + + + + + + + + + +