From fb3c612ec6575395b3312d2da9dcd119ec917ac6 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Wed, 6 Oct 2021 22:57:14 +0200 Subject: [PATCH] Add AWS S3 tests --- .github/workflows/ci.yml | 5 + .../kafka/backup/s3/BackupClientSpec.scala | 105 ++++++++++++++++-- .../backup/s3/MinioBackupClientSpec.scala | 15 +++ .../backup/s3/RealS3BackupClientSpec.scala | 19 ++++ build.sbt | 10 +- core-s3/src/test/resources/logback.xml | 14 +++ .../stream/alpakka/s3/GeneratorsSpec.scala | 26 ++++- .../aiven/guardian/kafka/s3/Generators.scala | 95 +++++++++++++--- .../aiven/guardian/kafka/s3/MinioS3Test.scala | 2 +- 9 files changed, 258 insertions(+), 33 deletions(-) create mode 100644 backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala create mode 100644 backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala create mode 100644 core-s3/src/test/resources/logback.xml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c5157b4c..f80c7a74 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,6 +14,11 @@ on: branches: [main] env: + ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }} + ALPAKKA_S3_REGION_DEFAULT_REGION: us-west-2 + ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }} + ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER: static + ALPAKKA_S3_REGION_PROVIDER: static GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} jobs: diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala index 1bc9722a..51a42d52 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala @@ -1,51 +1,135 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem import akka.stream.Attributes +import akka.stream.alpakka.s3.BucketAccess import akka.stream.alpakka.s3.S3Attributes +import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.scaladsl.S3 import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink +import akka.testkit.TestKitBase import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo +import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.akka.AkkaHttpTestKit -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.Generators._ import io.aiven.guardian.kafka.ScalaTestConstants import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.Config import io.aiven.guardian.kafka.s3.Generators._ -import io.aiven.guardian.kafka.s3.MinioS3Test import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} import io.aiven.guardian.kafka.s3.errors.S3Errors import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.must.Matchers +import org.scalatest.propspec.AnyPropSpecLike import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ import scala.language.postfixOps import java.time.OffsetDateTime +import java.util.concurrent.ConcurrentLinkedQueue -class BackupClientSpec - extends AnyPropTestKit(ActorSystem("S3BackupClientSpec")) +trait BackupClientSpec + extends TestKitBase + with AnyPropSpecLike with AkkaHttpTestKit with Matchers with ScalaCheckPropertyChecks with ScalaTestConstants - with MinioS3Test - with Config { + with Config + with BeforeAndAfterAll + with StrictLogging { + + implicit val ec: ExecutionContext = system.getDispatcher val ThrottleElements: Int = 100 val ThrottleAmount: FiniteDuration = 1 millis + val s3Settings: S3Settings + + implicit lazy val s3Attrs: Attributes = S3Attributes.settings(s3Settings) + + /** Whether to use virtual dot host, Typically this is disabled when testing against real services because they + * require domain name verification + */ + val useVirtualDotHost: Boolean + + /** A prefix that will get added to each generated bucket in the test, this is to track the buckets that are + * specifically created by the test + */ + lazy val bucketPrefix: Option[String] = None + + private val bucketsToCleanup = new ConcurrentLinkedQueue[String]() + + def createBucket(bucket: String): Future[Unit] = + for { + _ <- S3.makeBucket(bucket) + _ = if (enableCleanup.isDefined) + bucketsToCleanup.add(bucket) + } yield () + + /** Whether to enable cleanup of buckets after tests are run and if so the initial delay to wait after the test + */ + lazy val enableCleanup: Option[FiniteDuration] = None + + /** The MaxTimeout when cleaning up all of the buckets during `afterAll` + */ + lazy val MaxCleanupTimeout: FiniteDuration = 10 minutes + + private def cleanBucket(bucket: String): Future[Unit] = (for { + check <- S3.checkIfBucketExists(bucket) + _ <- check match { + case BucketAccess.AccessDenied => + Future { + logger.warn( + s"Cannot delete bucket: $bucket due to having access denied. Please look into this as it can fill up your AWS account" + ) + } + case BucketAccess.AccessGranted => + logger.info(s"Cleaning up bucket: $bucket") + for { + _ <- S3.deleteBucketContents(bucket).runWith(Sink.ignore) + multiParts <- S3.listMultipartUpload(bucket, None).runWith(Sink.seq) + _ <- Future.sequence(multiParts.map { part => + for { + _ <- S3.deleteUpload(bucket, part.key, part.uploadId) + } yield () + }) + _ <- S3.deleteBucket(bucket) + } yield () + case BucketAccess.NotExists => + Future { + logger.info(s"Not deleting bucket: $bucket since it no longer exists") + } + } + + } yield ()).recover { case util.control.NonFatal(error) => + logger.error(s"Error deleting bucket: $bucket", error) + } + + override def afterAll(): Unit = + enableCleanup match { + case Some(initialDelay) => + def cleanAllBuckets = { + val futures = bucketsToCleanup.asScala.toList.distinct.map(cleanBucket) + Future.sequence(futures) + } + + Await.result(akka.pattern.after(initialDelay)(cleanAllBuckets), MaxCleanupTimeout) + case None => () + } + property("backup method completes flow correctly for all valid Kafka events") { - forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen) { + forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") val backupClient = new MockedS3BackupClientInterface(kafkaDataWithTimePeriod.data, kafkaDataWithTimePeriod.periodSlice, s3Config, @@ -53,9 +137,6 @@ class BackupClientSpec Some(_.throttle(ThrottleElements, ThrottleAmount)) ) - implicit val ec: ExecutionContext = ExecutionContext.global - implicit val s3Attrs: Attributes = S3Attributes.settings(s3Settings) - val delay = (ThrottleAmount * (kafkaDataWithTimePeriod.data.size / ThrottleElements) * 1.2) + (10 millis) match { case fd: FiniteDuration => fd @@ -63,7 +144,7 @@ class BackupClientSpec } val calculatedFuture = for { - _ <- S3.makeBucket(s3Config.dataBucket) + _ <- createBucket(s3Config.dataBucket) _ <- backupClient.backup.run() _ <- akka.pattern.after(delay)(Future.successful(())) bucketContents <- diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala new file mode 100644 index 00000000..d733e91d --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala @@ -0,0 +1,15 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.actor.ActorSystem +import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.s3.MinioS3Test + +class MinioBackupClientSpec + extends AnyPropTestKit(ActorSystem("MinioS3BackupClientSpec")) + with BackupClientSpec + with MinioS3Test { + + /** Since Minio doesn't do DNS name verification we can enable this + */ + override lazy val useVirtualDotHost: Boolean = true +} diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala new file mode 100644 index 00000000..b6ae88a6 --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -0,0 +1,19 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.actor.ActorSystem +import akka.stream.alpakka.s3.S3Settings +import io.aiven.guardian.akka.AnyPropTestKit + +import scala.concurrent.duration._ +import scala.language.postfixOps + +class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with BackupClientSpec { + override lazy val s3Settings: S3Settings = S3Settings() + + /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail + * on bucket creation + */ + override lazy val useVirtualDotHost: Boolean = false + override lazy val bucketPrefix: Option[String] = Some("guardian-") + override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) +} diff --git a/build.sbt b/build.sbt index d7b9ea8c..080eb720 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/c val akkaVersion = "2.6.16" val akkaHttpVersion = "10.2.6" val alpakkaKafkaVersion = "2.1.1" -val alpakkaVersion = "3.0.2+28-88f3fdf3+20211001-1529-SNAPSHOT" +val alpakkaVersion = "3.0.2+27-27fdecf1+20211006-1009-SNAPSHOT" val quillJdbcMonixVersion = "3.7.2" val postgresqlJdbcVersion = "42.2.24" val scalaLoggingVersion = "3.9.4" @@ -249,6 +249,14 @@ ThisBuild / scalafixScalaBinaryVersion := scalaBinaryVersion.value ThisBuild / semanticdbEnabled := true +ThisBuild / githubWorkflowEnv ++= Map( + "ALPAKKA_S3_REGION_PROVIDER" -> "static", + "ALPAKKA_S3_REGION_DEFAULT_REGION" -> "us-west-2", + "ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER" -> "static", + "ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID" -> "${{ secrets.AWS_ACCESS_KEY }}", + "ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY" -> "${{ secrets.AWS_SECRET_KEY }}" +) + import ReleaseTransformations._ releaseCrossBuild := true diff --git a/core-s3/src/test/resources/logback.xml b/core-s3/src/test/resources/logback.xml new file mode 100644 index 00000000..198bd335 --- /dev/null +++ b/core-s3/src/test/resources/logback.xml @@ -0,0 +1,14 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n + + + + + + + + + + diff --git a/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala b/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala index 6ec99b5a..d06a013a 100644 --- a/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala +++ b/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala @@ -2,13 +2,35 @@ package akka.stream.alpakka.s3 import akka.actor.ActorSystem import io.aiven.guardian.kafka.s3.Generators +import org.scalacheck.Gen import org.scalatest.matchers.must.Matchers import org.scalatest.propspec.AnyPropSpec import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks class GeneratorsSpec extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks { - property("Bucket name generators generates valid bucket names according to S3Settings") { - forAll(Generators.bucketNameGen) { bucket => + property("Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost") { + forAll(Generators.bucketNameGen(useVirtualDotHost = true)) { bucket => + noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem())) + } + } + + property("Bucket name generators generates valid bucket names according to S3Settings without virtualDotHost") { + forAll(Generators.bucketNameGen(useVirtualDotHost = false)) { bucket => + noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem())) + } + } + + def withPrefixGen(useVirtualDotHost: Boolean): Gen[String] = for { + range <- Gen.choose(2, Generators.MaxBucketLength - 2) + firstChar <- Generators.bucketLetterOrNumberCharGen + chars <- Gen.listOfN(range, Generators.bucketAllCharGen(useVirtualDotHost = false)) + bucketName <- Generators.bucketNameGen(useVirtualDotHost, Some((firstChar +: chars).mkString)) + } yield bucketName + + property( + "Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix" + ) { + forAll(withPrefixGen(useVirtualDotHost = true)) { bucket => noException must be thrownBy BucketAndKey.validateBucketName(bucket, S3Settings(ActorSystem())) } } diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala index 8f043d5d..5fa86ee6 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala @@ -6,6 +6,8 @@ import org.scalacheck.Gen import scala.annotation.nowarn object Generators { + val MaxBucketLength: Int = 63 + // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html for valid // bucketnames @@ -14,12 +16,17 @@ object Generators { (1, Gen.alphaLowerChar) ) - lazy val bucketAllCharGen: Gen[Char] = Gen.frequency( - (10, Gen.alphaLowerChar), - (1, Gen.const('.')), - (1, Gen.const('-')), - (1, Gen.numChar) - ) + def bucketAllCharGen(useVirtualDotHost: Boolean): Gen[Char] = { + val base = List( + (10, Gen.alphaLowerChar), + (1, Gen.const('-')), + (1, Gen.numChar) + ) + + val frequency = if (useVirtualDotHost) (1, Gen.const('.')) +: base else base + + Gen.frequency(frequency: _*) + } @nowarn("msg=not.*?exhaustive") private def checkInvalidDuplicateChars(chars: List[Char]): Boolean = @@ -27,29 +34,83 @@ object Generators { !(before == '.' && after == '.' || before == '-' && after == '.' || before == '.' && after == '-') } - lazy val bucketNameGen: Gen[String] = { + private def checkAlphaChar(c: Char): Boolean = + c >= 'a' && c <= 'z' + + private def allCharCheck(useVirtualDotHost: Boolean, string: String): Boolean = + if (useVirtualDotHost) { + string.forall(char => Character.isDigit(char) || checkAlphaChar(char) || char == '-' || char == '.') && + checkInvalidDuplicateChars(string.toList) + } else + string.forall(char => Character.isDigit(char) || checkAlphaChar(char) || char == '-') + + def validatePrefix(useVirtualDotHost: Boolean, prefix: Option[String]): Option[String] = { + val withoutWhitespace = prefix match { + case Some(value) if value.trim == "" => None + case Some(value) => Some(value) + case None => None + } + + withoutWhitespace match { + case Some(value) if !(Character.isDigit(value.head) || checkAlphaChar(value.head)) => + throw new IllegalArgumentException( + s"Invalid starting digit for prefix $value, ${value.head} needs to be an alpha char or digit" + ) + case Some(value) if value.length > 1 => + if (!allCharCheck(useVirtualDotHost, value.drop(1))) + throw new IllegalArgumentException( + s"Prefix $value contains invalid characters" + ) + case Some(value) if value.length > MaxBucketLength - 1 => + throw new IllegalArgumentException( + s"Prefix is too long, it has size ${value.length} where as the max bucket size is $MaxBucketLength" + ) + case _ => () + } + + withoutWhitespace + } + + def bucketNameGen(useVirtualDotHost: Boolean, prefix: Option[String] = None): Gen[String] = { + val finalPrefix = validatePrefix(useVirtualDotHost, prefix) + for { - range <- Gen.choose(3, 63) + range <- { + val maxLength = finalPrefix match { + case Some(p) => MaxBucketLength - p.length + case None => MaxBucketLength + } + + if (maxLength > 3) + Gen.choose(3, maxLength) + else + Gen.const(maxLength) + } + startString = finalPrefix.getOrElse("") + bucketName <- range match { case 3 => for { first <- bucketLetterOrNumberCharGen - second <- bucketAllCharGen + second <- bucketAllCharGen(useVirtualDotHost) third <- bucketLetterOrNumberCharGen - } yield List(first, second, third).mkString + } yield startString ++ List(first, second, third).mkString case _ => for { - first <- bucketLetterOrNumberCharGen - last <- bucketLetterOrNumberCharGen - middle <- Gen.listOfN(range - 2, bucketAllCharGen).filter(checkInvalidDuplicateChars) - } yield first.toString ++ middle.mkString ++ last.toString + first <- bucketLetterOrNumberCharGen + last <- bucketLetterOrNumberCharGen + middle <- { + val gen = Gen.listOfN(range - 2, bucketAllCharGen(useVirtualDotHost)) + if (useVirtualDotHost) gen.filter(checkInvalidDuplicateChars) else gen + } + } yield startString ++ first.toString ++ middle.mkString ++ last.toString } } yield bucketName } - val s3ConfigGen: Gen[S3Config] = (for { - dataBucket <- bucketNameGen - compactionBucket <- bucketNameGen + def s3ConfigGen(useVirtualDotHost: Boolean, prefix: Option[String] = None): Gen[S3Config] = (for { + dataBucket <- bucketNameGen(useVirtualDotHost, prefix) + compactionBucket <- bucketNameGen(useVirtualDotHost, prefix) } yield S3Config(dataBucket, compactionBucket)).filter(config => config.dataBucket != config.compactionBucket) } diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala index 97a26783..d2700d0e 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala @@ -24,5 +24,5 @@ trait MinioS3Test extends ForAllTestContainer with TestKitBase { this: Suite => }) .withAccessStyle(AccessStyle.PathAccessStyle) - override val container: MinioContainer = new MinioContainer(S3DummyAccessKey, S3DummySecretKey) + override lazy val container: MinioContainer = new MinioContainer(S3DummyAccessKey, S3DummySecretKey) }