Skip to content

Commit

Permalink
Add AWS S3 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Oct 7, 2021
1 parent f487d1d commit fb3c612
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 33 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ on:
branches: [main]

env:
ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }}
ALPAKKA_S3_REGION_DEFAULT_REGION: us-west-2
ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }}
ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER: static
ALPAKKA_S3_REGION_PROVIDER: static
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

jobs:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,150 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.stream.Attributes
import akka.stream.alpakka.s3.BucketAccess
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.testkit.TestKitBase
import com.softwaremill.diffx.generic.auto._
import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.akka.AkkaHttpTestKit
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.ScalaTestConstants
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Config
import io.aiven.guardian.kafka.s3.Generators._
import io.aiven.guardian.kafka.s3.MinioS3Test
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import io.aiven.guardian.kafka.s3.errors.S3Errors
import org.mdedetrich.akka.stream.support.CirceStreamSupport
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.must.Matchers
import org.scalatest.propspec.AnyPropSpecLike
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.language.postfixOps

import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentLinkedQueue

class BackupClientSpec
extends AnyPropTestKit(ActorSystem("S3BackupClientSpec"))
trait BackupClientSpec
extends TestKitBase
with AnyPropSpecLike
with AkkaHttpTestKit
with Matchers
with ScalaCheckPropertyChecks
with ScalaTestConstants
with MinioS3Test
with Config {
with Config
with BeforeAndAfterAll
with StrictLogging {

implicit val ec: ExecutionContext = system.getDispatcher

val ThrottleElements: Int = 100
val ThrottleAmount: FiniteDuration = 1 millis

val s3Settings: S3Settings

implicit lazy val s3Attrs: Attributes = S3Attributes.settings(s3Settings)

/** Whether to use virtual dot host, Typically this is disabled when testing against real services because they
* require domain name verification
*/
val useVirtualDotHost: Boolean

/** A prefix that will get added to each generated bucket in the test, this is to track the buckets that are
* specifically created by the test
*/
lazy val bucketPrefix: Option[String] = None

private val bucketsToCleanup = new ConcurrentLinkedQueue[String]()

def createBucket(bucket: String): Future[Unit] =
for {
_ <- S3.makeBucket(bucket)
_ = if (enableCleanup.isDefined)
bucketsToCleanup.add(bucket)
} yield ()

/** Whether to enable cleanup of buckets after tests are run and if so the initial delay to wait after the test
*/
lazy val enableCleanup: Option[FiniteDuration] = None

/** The MaxTimeout when cleaning up all of the buckets during `afterAll`
*/
lazy val MaxCleanupTimeout: FiniteDuration = 10 minutes

private def cleanBucket(bucket: String): Future[Unit] = (for {
check <- S3.checkIfBucketExists(bucket)
_ <- check match {
case BucketAccess.AccessDenied =>
Future {
logger.warn(
s"Cannot delete bucket: $bucket due to having access denied. Please look into this as it can fill up your AWS account"
)
}
case BucketAccess.AccessGranted =>
logger.info(s"Cleaning up bucket: $bucket")
for {
_ <- S3.deleteBucketContents(bucket).runWith(Sink.ignore)
multiParts <- S3.listMultipartUpload(bucket, None).runWith(Sink.seq)
_ <- Future.sequence(multiParts.map { part =>
for {
_ <- S3.deleteUpload(bucket, part.key, part.uploadId)
} yield ()
})
_ <- S3.deleteBucket(bucket)
} yield ()
case BucketAccess.NotExists =>
Future {
logger.info(s"Not deleting bucket: $bucket since it no longer exists")
}
}

} yield ()).recover { case util.control.NonFatal(error) =>
logger.error(s"Error deleting bucket: $bucket", error)
}

override def afterAll(): Unit =
enableCleanup match {
case Some(initialDelay) =>
def cleanAllBuckets = {
val futures = bucketsToCleanup.asScala.toList.distinct.map(cleanBucket)
Future.sequence(futures)
}

Await.result(akka.pattern.after(initialDelay)(cleanAllBuckets), MaxCleanupTimeout)
case None => ()
}

property("backup method completes flow correctly for all valid Kafka events") {
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen) {
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val backupClient = new MockedS3BackupClientInterface(kafkaDataWithTimePeriod.data,
kafkaDataWithTimePeriod.periodSlice,
s3Config,
Some(s3Settings),
Some(_.throttle(ThrottleElements, ThrottleAmount))
)

implicit val ec: ExecutionContext = ExecutionContext.global
implicit val s3Attrs: Attributes = S3Attributes.settings(s3Settings)

val delay =
(ThrottleAmount * (kafkaDataWithTimePeriod.data.size / ThrottleElements) * 1.2) + (10 millis) match {
case fd: FiniteDuration => fd
case _: Duration.Infinite => throw new Exception("Expected Finite Duration")
}

val calculatedFuture = for {
_ <- S3.makeBucket(s3Config.dataBucket)
_ <- createBucket(s3Config.dataBucket)
_ <- backupClient.backup.run()
_ <- akka.pattern.after(delay)(Future.successful(()))
bucketContents <-
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.s3.MinioS3Test

class MinioBackupClientSpec
extends AnyPropTestKit(ActorSystem("MinioS3BackupClientSpec"))
with BackupClientSpec
with MinioS3Test {

/** Since Minio doesn't do DNS name verification we can enable this
*/
override lazy val useVirtualDotHost: Boolean = true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Settings
import io.aiven.guardian.akka.AnyPropTestKit

import scala.concurrent.duration._
import scala.language.postfixOps

class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with BackupClientSpec {
override lazy val s3Settings: S3Settings = S3Settings()

/** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail
* on bucket creation
*/
override lazy val useVirtualDotHost: Boolean = false
override lazy val bucketPrefix: Option[String] = Some("guardian-")
override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds)
}
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/c
val akkaVersion = "2.6.16"
val akkaHttpVersion = "10.2.6"
val alpakkaKafkaVersion = "2.1.1"
val alpakkaVersion = "3.0.2+28-88f3fdf3+20211001-1529-SNAPSHOT"
val alpakkaVersion = "3.0.2+27-27fdecf1+20211006-1009-SNAPSHOT"
val quillJdbcMonixVersion = "3.7.2"
val postgresqlJdbcVersion = "42.2.24"
val scalaLoggingVersion = "3.9.4"
Expand Down Expand Up @@ -249,6 +249,14 @@ ThisBuild / scalafixScalaBinaryVersion := scalaBinaryVersion.value

ThisBuild / semanticdbEnabled := true

ThisBuild / githubWorkflowEnv ++= Map(
"ALPAKKA_S3_REGION_PROVIDER" -> "static",
"ALPAKKA_S3_REGION_DEFAULT_REGION" -> "us-west-2",
"ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER" -> "static",
"ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID" -> "${{ secrets.AWS_ACCESS_KEY }}",
"ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY" -> "${{ secrets.AWS_SECRET_KEY }}"
)

import ReleaseTransformations._

releaseCrossBuild := true
Expand Down
14 changes: 14 additions & 0 deletions core-s3/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.testcontainers" level="INFO"/>
<logger name="com.github.dockerjava" level="WARN"/>
</configuration>
26 changes: 24 additions & 2 deletions core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,35 @@ package akka.stream.alpakka.s3

import akka.actor.ActorSystem
import io.aiven.guardian.kafka.s3.Generators
import org.scalacheck.Gen
import org.scalatest.matchers.must.Matchers
import org.scalatest.propspec.AnyPropSpec
import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks

class GeneratorsSpec extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks {
property("Bucket name generators generates valid bucket names according to S3Settings") {
forAll(Generators.bucketNameGen) { bucket =>
property("Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost") {
forAll(Generators.bucketNameGen(useVirtualDotHost = true)) { bucket =>
noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem()))
}
}

property("Bucket name generators generates valid bucket names according to S3Settings without virtualDotHost") {
forAll(Generators.bucketNameGen(useVirtualDotHost = false)) { bucket =>
noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem()))
}
}

def withPrefixGen(useVirtualDotHost: Boolean): Gen[String] = for {
range <- Gen.choose(2, Generators.MaxBucketLength - 2)
firstChar <- Generators.bucketLetterOrNumberCharGen
chars <- Gen.listOfN(range, Generators.bucketAllCharGen(useVirtualDotHost = false))
bucketName <- Generators.bucketNameGen(useVirtualDotHost, Some((firstChar +: chars).mkString))
} yield bucketName

property(
"Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix"
) {
forAll(withPrefixGen(useVirtualDotHost = true)) { bucket =>
noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem()))
}
}
Expand Down
Loading

0 comments on commit fb3c612

Please sign in to comment.