diff --git a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala index 3996d246..69b1b5c6 100644 --- a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala +++ b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala @@ -1,5 +1,6 @@ package io.aiven.guardian.kafka.backup.gcs +import akka.actor.ActorSystem import akka.http.scaladsl.model.ContentTypes import akka.stream.alpakka.google.GoogleAttributes import akka.stream.alpakka.google.GoogleSettings @@ -15,9 +16,12 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig} import scala.concurrent.ExecutionContext import scala.concurrent.Future +// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in +// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit override val kafkaClientInterface: T, override val backupConfig: Backup, + override val system: ActorSystem, gcsConfig: GCSConfig ) extends BackupClientInterface[T] { @@ -25,12 +29,22 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google override type BackupResult = Option[StorageObject] - override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { + override type CurrentState = Nothing + + override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) + + override def backupToStorageSink(key: String, + currentState: Option[Nothing] + ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = { val base = GCStorage .resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`) .mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) - maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings))) + maybeGoogleSettings + .fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings))) + .contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) => + byteString + } } } diff --git a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala index fa44c1d7..3da076cb 100644 --- a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala +++ b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala @@ -1,40 +1,168 @@ package io.aiven.guardian.kafka.backup.s3 +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.alpakka.s3.FailedUploadPart import akka.stream.alpakka.s3.MultipartUploadResult +import akka.stream.alpakka.s3.Part import akka.stream.alpakka.s3.S3Attributes import akka.stream.alpakka.s3.S3Headers import akka.stream.alpakka.s3.S3Settings +import akka.stream.alpakka.s3.SuccessfulUploadPart +import akka.stream.alpakka.s3.UploadPartResponse import akka.stream.alpakka.s3.scaladsl.S3 import akka.stream.scaladsl._ import akka.util.ByteString +import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.kafka.KafkaClientInterface import io.aiven.guardian.kafka.backup.BackupClientInterface import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future +import java.time.Instant + +final case class CurrentS3State(uploadId: String, parts: Seq[Part]) + class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit override val kafkaClientInterface: T, override val backupConfig: Backup, + override val system: ActorSystem, s3Config: S3Config, s3Headers: S3Headers -) extends BackupClientInterface[T] { +) extends BackupClientInterface[T] + with StrictLogging { override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None) override type BackupResult = Option[MultipartUploadResult] - override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = { - val base = S3 - .multipartUploadWithHeaders( - s3Config.dataBucket, - key, - s3Headers = s3Headers, - chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka + override type CurrentState = CurrentS3State + + override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = { + implicit val ec: ExecutionContext = system.classicSystem.getDispatcher + + val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None) + + for { + incompleteUploads <- + maybeS3Settings + .fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings))) + .runWith(Sink.seq) + keys = incompleteUploads.filter(_.key == key) + result <- if (keys.isEmpty) + Future.successful(None) + else { + val listMultipartUploads = keys match { + case Seq(single) => + logger.info( + s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}" + ) + single + case rest => + val last = rest.minBy(_.initiated)(Ordering[Instant].reverse) + logger.warn( + s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" + ) + last + } + val uploadId = listMultipartUploads.uploadId + val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId) + + for { + parts <- maybeS3Settings + .fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings))) + .runWith(Sink.seq) + + finalParts = parts.lastOption match { + case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize => + parts + case _ => + // We drop the last part here since its broken + parts.dropRight(1) + } + } yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart))) + } + } yield result + + } + + private[s3] def failureSink + : Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink + .foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) => + logger.warn( + s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}", + failedPart.exception ) - .mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) + } + + private[s3] def successSink + : Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = + kafkaClientInterface.commitCursor + .contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) => + kafkaClientInterface.batchCursorContext(cursors) + } + + private[s3] def kafkaBatchSink + : Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = { + + val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] + .collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] + .wireTap { data => + val (part, _) = data + logger.info( + s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}" + ) + } + .toMat(successSink)(Keep.none) + + val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] + .collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] + .toMat(failureSink)(Keep.none) + + Sink.combine(success, failure)(Broadcast(_)) + } + + override def backupToStorageSink(key: String, + currentState: Option[CurrentS3State] + ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = { + + // Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka. + // Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so + // we can't have concurrent updates to this data structure. + + val sink = currentState match { + case Some(state) => + logger.info( + s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key" + ) + S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext]( + s3Config.dataBucket, + key, + state.uploadId, + state.parts, + kafkaBatchSink, + s3Headers = s3Headers, + chunkingParallelism = 1 + ) + case None => + logger.info( + s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key" + ) + S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext]( + s3Config.dataBucket, + key, + kafkaBatchSink, + s3Headers = s3Headers, + chunkingParallelism = 1 + ) + } + + val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings))) } } diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala new file mode 100644 index 00000000..6bdc2d6f --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala @@ -0,0 +1,33 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.Done +import akka.actor.ActorSystem +import akka.stream.alpakka.s3.S3Headers +import akka.stream.alpakka.s3.S3Settings +import akka.stream.alpakka.s3.SuccessfulUploadPart +import akka.stream.scaladsl.Sink +import io.aiven.guardian.kafka.KafkaClientInterface +import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} + +import scala.collection.immutable +import scala.concurrent.Future + +import java.util.concurrent.ConcurrentLinkedQueue + +class BackupClientChunkState[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit + override val kafkaClientInterface: T, + override val backupConfig: Backup, + override val system: ActorSystem, + s3Config: S3Config, + s3Headers: S3Headers +) extends BackupClient[T](maybeS3Settings) { + val processedChunks: ConcurrentLinkedQueue[SuccessfulUploadPart] = new ConcurrentLinkedQueue[SuccessfulUploadPart]() + + override val successSink + : Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = + super.successSink.contramap { case (part, value) => + processedChunks.add(part) + (part, value) + } +} diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala index f9204ba9..8fdd342b 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala @@ -1,7 +1,9 @@ package io.aiven.guardian.kafka.backup.s3 +import akka.actor.Scheduler import akka.stream.Attributes import akka.stream.alpakka.s3.BucketAccess +import akka.stream.alpakka.s3.ListBucketResultContents import akka.stream.alpakka.s3.S3Attributes import akka.stream.alpakka.s3.S3Settings import akka.stream.alpakka.s3.scaladsl.S3 @@ -14,6 +16,7 @@ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.akka.AkkaHttpTestKit import io.aiven.guardian.kafka.Generators._ +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.Config @@ -49,7 +52,9 @@ trait BackupClientSpec with StrictLogging { implicit val ec: ExecutionContext = system.dispatcher - implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis) + implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis) + implicit override val generatorDrivenConfig: PropertyCheckConfiguration = + PropertyCheckConfiguration(minSuccessful = 1, minSize = 1) val ThrottleElements: Int = 100 val ThrottleAmount: FiniteDuration = 1 millis @@ -128,13 +133,41 @@ trait BackupClientSpec case None => () } + /** @param dataBucket + * Which S3 bucket the objects are being persisted into + * @param transformResult + * A function that transforms the download result from S3 into the data `T` that you need. Note that you can also + * throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate) + * @param attempts + * Total number of attempts + * @param delay + * The delay between each attempt after the first + * @tparam T + * Type of the final result transformed by `transformResult` + * @return + */ + def waitForS3Download[T](dataBucket: String, + transformResult: Seq[ListBucketResultContents] => T, + attempts: Int = 10, + delay: FiniteDuration = 1 second + ): Future[T] = { + implicit val scheduler: Scheduler = system.scheduler + + val attempt = () => + S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map { + transformResult + } + + akka.pattern.retry(attempt, attempts, delay) + } + property("backup method completes flow correctly for all valid Kafka events") { forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) => logger.info(s"Data bucket is ${s3Config.dataBucket}") val backupClient = new MockedS3BackupClientInterface( Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount), - kafkaDataWithTimePeriod.periodSlice, + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice), s3Config, Some(s3Settings) ) @@ -171,7 +204,7 @@ trait BackupClientSpec }) keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) => source - .via(CirceStreamSupport.decode[List[ReducedConsumerRecord]]) + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) .toMat(Sink.collection)(Keep.right) .run() .map(list => (key, list.flatten)) @@ -181,7 +214,9 @@ trait BackupClientSpec OffsetDateTime.parse(date).toEpochSecond }(Ordering[Long].reverse) flattened = sorted.flatMap { case (_, records) => records } - } yield flattened + } yield flattened.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } val observed = calculatedFuture.futureValue kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaClientWithKillSwitch.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaClientWithKillSwitch.scala new file mode 100644 index 00000000..cd245090 --- /dev/null +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaClientWithKillSwitch.scala @@ -0,0 +1,27 @@ +package io.aiven.guardian.kafka.backup.s3 + +import akka.actor.ActorSystem +import akka.kafka.CommitterSettings +import akka.kafka.ConsumerMessage +import akka.kafka.ConsumerSettings +import akka.kafka.scaladsl.Consumer +import akka.stream.SharedKillSwitch +import akka.stream.scaladsl.SourceWithContext +import io.aiven.guardian.kafka.KafkaClient +import io.aiven.guardian.kafka.configs.KafkaCluster +import io.aiven.guardian.kafka.models.ReducedConsumerRecord + +class KafkaClientWithKillSwitch( + configureConsumer: Option[ + ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]] + ] = None, + configureCommitter: Option[ + CommitterSettings => CommitterSettings + ] = None, + killSwitch: SharedKillSwitch +)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster) + extends KafkaClient(configureConsumer, configureCommitter) { + override def getSource + : SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] = + super.getSource.via(killSwitch.flow) +} diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala index d92eaf0f..3e7f90b0 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala @@ -1,24 +1,25 @@ package io.aiven.guardian.kafka.backup.s3 import akka.NotUsed +import akka.actor.ActorSystem import akka.stream.alpakka.s3.S3Headers import akka.stream.alpakka.s3.S3Settings import akka.stream.scaladsl.Source import io.aiven.guardian.kafka.MockedKafkaClientInterface import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.TimeConfiguration import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import scala.concurrent.duration.FiniteDuration - class MockedS3BackupClientInterface( kafkaData: Source[ReducedConsumerRecord, NotUsed], - periodSlice: FiniteDuration, + timeConfiguration: TimeConfiguration, s3Config: S3Config, maybeS3Settings: Option[S3Settings] -)(implicit val s3Headers: S3Headers) +)(implicit val s3Headers: S3Headers, system: ActorSystem) extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData), - Backup(periodSlice), + Backup(timeConfiguration), + implicitly, s3Config, implicitly ) diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala index b6ae88a6..6851938c 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -1,13 +1,52 @@ package io.aiven.guardian.kafka.backup.s3 +import akka.Done +import akka.NotUsed import akka.actor.ActorSystem +import akka.kafka.ConsumerSettings +import akka.kafka.ProducerSettings +import akka.kafka.scaladsl.Producer +import akka.stream.KillSwitches +import akka.stream.SharedKillSwitch +import akka.stream.alpakka.s3.ListBucketResultContents import akka.stream.alpakka.s3.S3Settings +import akka.stream.alpakka.s3.scaladsl.S3 +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source import io.aiven.guardian.akka.AnyPropTestKit +import io.aiven.guardian.kafka.Generators.KafkaDataInChunksWithTimePeriod +import io.aiven.guardian.kafka.Generators.kafkaDataWithMinSizeGen +import io.aiven.guardian.kafka.KafkaClient +import io.aiven.guardian.kafka.KafkaClusterTest +import io.aiven.guardian.kafka.Utils._ +import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.codecs.Circe._ +import io.aiven.guardian.kafka.configs.KafkaCluster +import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen +import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.mdedetrich.akka.stream.support.CirceStreamSupport +import scala.concurrent.Future import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import scala.language.postfixOps -class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with BackupClientSpec { +import java.time.temporal.ChronoUnit +import java.util.Base64 + +class RealS3BackupClientSpec + extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) + with BackupClientSpec + with KafkaClusterTest { override lazy val s3Settings: S3Settings = S3Settings() /** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail @@ -16,4 +55,289 @@ class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupCli override lazy val useVirtualDotHost: Boolean = false override lazy val bucketPrefix: Option[String] = Some("guardian-") override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds) + + /** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster + */ + val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds) + + case object TerminationException extends Exception("termination-exception") + + def reducedConsumerRecordsToJson(reducedConsumerRecords: List[ReducedConsumerRecord]): Array[Byte] = { + import io.aiven.guardian.kafka.codecs.Circe._ + import io.circe.syntax._ + reducedConsumerRecords.asJson.noSpaces.getBytes + } + + def baseKafkaConfig: Some[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = + Some( + _.withBootstrapServers( + container.bootstrapServers + ).withGroupId("test-group") + ) + + def createKafkaClient( + killSwitch: SharedKillSwitch + )(implicit kafkaClusterConfig: KafkaCluster): KafkaClientWithKillSwitch = + new KafkaClientWithKillSwitch( + configureConsumer = baseKafkaConfig, + killSwitch = killSwitch + ) + + /** Converts a generated list of `ReducedConsumerRecord` to a list of `ProducerRecord` + * @param data + * List of `ReducedConsumerRecord`'s generated by scalacheck + * @return + * A list of `ProducerRecord`. Note that it only uses the `key`/`value` and ignores other values + */ + def toProducerRecords(data: List[ReducedConsumerRecord]): List[ProducerRecord[Array[Byte], Array[Byte]]] = data.map { + reducedConsumerRecord => + val keyAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.key) + val valueAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.value) + new ProducerRecord[Array[Byte], Array[Byte]](reducedConsumerRecord.topic, keyAsByteArray, valueAsByteArray) + } + + /** Converts a list of `ProducerRecord` to a source that is streamed over a period of time + * @param producerRecords + * The list of producer records + * @param streamDuration + * The period over which the topics will be streamed + * @return + * Source ready to be passed into a Kafka producer + */ + def toSource( + producerRecords: List[ProducerRecord[Array[Byte], Array[Byte]]], + streamDuration: FiniteDuration + ): Source[ProducerRecord[Array[Byte], Array[Byte]], NotUsed] = { + val durationToMicros = streamDuration.toMillis + val topicsPerMillis = producerRecords.size / durationToMicros + Source(producerRecords).throttle(topicsPerMillis.toInt, 1 millis) + } + + /** Call this function to send a message after the next step of configured time period to trigger a rollover so the + * current object will finish processing + * @param duration + * @param producerSettings + * @param topic + * @return + */ + def sendTopicAfterTimePeriod(duration: FiniteDuration, + producerSettings: ProducerSettings[Array[Byte], Array[Byte]], + topic: String + ): Future[Done] = akka.pattern.after(duration) { + Source( + List( + new ProducerRecord[Array[Byte], Array[Byte]](topic, "1".getBytes, "1".getBytes) + ) + ).runWith(Producer.plainSink(producerSettings)) + } + + def createProducer(): ProducerSettings[Array[Byte], Array[Byte]] = + ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer) + .withBootstrapServers(container.bootstrapServers) + + case class DownloadNotReady(downloads: Seq[ListBucketResultContents]) + extends Exception(s"Download not ready, current state is ${downloads.map(_.toString).mkString(",")}") + + def getKeyFromSingleDownload(dataBucket: String): Future[String] = waitForS3Download( + dataBucket, + { + case Seq(single) => single.key + case rest => + throw DownloadNotReady(rest) + } + ) + + property("entire flow works properly from start to end") { + forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix) + ) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = Backup(PeriodFromFirst(1 minute)) + + val producerSettings = createProducer() + + val backupClient = + new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + + val adminClient = AdminClient.create( + Map[String, AnyRef]( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers + ).asJava + ) + + val createTopics = adminClient.createTopics(topics.map { topic => + new NewTopic(topic, 1, 1.toShort) + }.asJava) + + val calculatedFuture = for { + _ <- createTopics.all().toCompletableFuture.asScala + _ <- createBucket(s3Config.dataBucket) + _ = backupClient.backup.run() + _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + baseSource + .runWith(Producer.plainSink(producerSettings)) + ) + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + key <- getKeyFromSingleDownload(s3Config.dataBucket) + downloaded <- + S3.download(s3Config.dataBucket, key) + .withAttributes(s3Attrs) + .runWith(Sink.head) + .flatMap { + case Some((downloadSource, _)) => + downloadSource.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]).runWith(Sink.seq) + case None => throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key") + } + + } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + val downloaded = calculatedFuture.futureValue + + val downloadedGroupedAsKey = downloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKey mustEqual inputAsKey + } + } + + def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_], + step: FiniteDuration = 100 millis, + delay: FiniteDuration = 5 seconds + ): Future[Unit] = + if (backupClient.processedChunks.size() > 0) + akka.pattern.after(delay)(Future.successful(())) + else + akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) + + property("suspend/resume works correctly") { + forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix) + ) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = Backup(ChronoUnitSlice(ChronoUnit.MINUTES)) + + val producerSettings = createProducer() + + val firstKillSwitch = KillSwitches.shared("first-kill-switch") + + val backupClient = + new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch), + implicitly, + implicitly, + implicitly, + implicitly + ) + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + val adminClient = AdminClient.create( + Map[String, AnyRef]( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers + ).asJava + ) + + val createTopics = adminClient.createTopics(topics.map { topic => + new NewTopic(topic, 1, 1.toShort) + }.asJava) + + val calculatedFuture = for { + _ <- createTopics.all().toCompletableFuture.asScala + _ <- createBucket(s3Config.dataBucket) + _ = backupClient.backup.run() + _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) + _ = baseSource.runWith(Producer.plainSink(producerSettings)) + _ <- waitUntilBackupClientHasCommitted(backupClient) + _ = firstKillSwitch.abort(TerminationException) + secondBackupClient <- akka.pattern.after(2 seconds) { + Future { + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + } + } + _ = secondBackupClient.backup.run() + _ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head) + key <- getKeyFromSingleDownload(s3Config.dataBucket) + downloaded <- S3.download(s3Config.dataBucket, key) + .withAttributes(s3Attrs) + .runWith(Sink.head) + .flatMap { + case Some((downloadSource, _)) => + downloadSource + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .runWith(Sink.seq) + case None => + throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key") + } + + } yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + val downloaded = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val downloadedGroupedAsKey = downloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + downloadedGroupedAsKey mustEqual inputAsKey + } + } } diff --git a/build.sbt b/build.sbt index fee51c89..cf935554 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/c val akkaVersion = "2.6.17" val akkaHttpVersion = "10.2.7" val alpakkaKafkaVersion = "2.1.1" -val alpakkaVersion = "3.0.2+34-bdac5519+20211013-1607-SNAPSHOT" +val alpakkaVersion = "3.0.4" val quillJdbcMonixVersion = "3.7.2" val postgresqlJdbcVersion = "42.2.24" val scalaLoggingVersion = "3.9.4" @@ -23,6 +23,7 @@ val akkaStreamsJson = "0.8.0" val diffxVersion = "0.5.6" val testContainersVersion = "0.39.8" val testContainersJavaVersion = "1.16.2" +val scalaCheckVersion = "1.15.5-1-SNAPSHOT" val flagsFor12 = Seq( "-Xlint:_", @@ -86,6 +87,7 @@ lazy val core = project "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, "org.scalatest" %% "scalatest" % scalaTestVersion % Test, "org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test, + "org.mdedetrich" %% "scalacheck" % scalaCheckVersion % Test, "com.softwaremill.diffx" %% "diffx-scalatest" % diffxVersion % Test, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test, @@ -101,10 +103,10 @@ lazy val coreS3 = project librarySettings, name := s"$baseName-s3", libraryDependencies ++= Seq( - "org.mdedetrich" %% "akka-stream-alpakka-s3" % alpakkaVersion, - "org.scalatest" %% "scalatest" % scalaTestVersion % Test, - "org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test, - "com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test + "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test, + "org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test, + "com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test ) ) .dependsOn(core % "compile->compile;test->test") @@ -115,10 +117,10 @@ lazy val coreGCS = project librarySettings, name := s"$baseName-gcs", libraryDependencies ++= Seq( - "org.mdedetrich" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion, - "org.scalatest" %% "scalatest" % scalaTestVersion % Test, - "org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test, - "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test + "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion, + "org.scalatest" %% "scalatest" % scalaTestVersion % Test, + "org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test, + "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test ) ) .dependsOn(core % "compile->compile;test->test") diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index 286d22a0..e3844693 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -1,18 +1,23 @@ package io.aiven.guardian.kafka.backup import akka.NotUsed +import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.util.ByteString import io.aiven.guardian.kafka.Errors import io.aiven.guardian.kafka.KafkaClientInterface import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst +import io.aiven.guardian.kafka.backup.configs.TimeConfiguration import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.circe.syntax._ import scala.annotation.nowarn +import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import java.time._ import java.time.format.DateTimeFormatter @@ -25,6 +30,7 @@ import java.time.temporal._ trait BackupClientInterface[T <: KafkaClientInterface] { implicit val kafkaClientInterface: T implicit val backupConfig: Backup + implicit val system: ActorSystem /** An element from the original record */ @@ -53,15 +59,33 @@ trait BackupClientInterface[T <: KafkaClientInterface] { */ type BackupResult + /** Override this type to define the result of calculating the previous state (if it exists) + */ + type CurrentState + import BackupClientInterface._ - /** Override this method to define how to backup a `ByteString` to a `DataSource` + /** Override this method to define how to retrieve the current state of a backup. + * @param key + * The object key or filename for what is being backed up + * @return + * An optional [[Future]] that contains the state if it found a previously aborted backup. Return [[None]] if if + * its a brand new backup. + */ + def getCurrentUploadState(key: String): Future[Option[CurrentState]] + + /** Override this method to define how to backup a `ByteString` combined with Kafka + * `kafkaClientInterface.CursorContext` to a `DataSource` * @param key * The object key or filename for what is being backed up + * @param currentState + * The current state if it exists. This is used when resuming from a previously aborted backup * @return * A Sink that also provides a `BackupResult` */ - def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] + def backupToStorageSink(key: String, + currentState: Option[CurrentState] + ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] /** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been * split at `BackupStreamPosition.Start`. If you have difficulties defining an empty value for `BackupResult` then @@ -134,7 +158,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] { kafkaClientInterface.CursorContext, kafkaClientInterface.Control ] = SourceWithContext.fromTuples(source.map { case (firstTimestamp, (record, context)) => - val period = calculateNumberOfPeriodsFromTimestamp(firstTimestamp, backupConfig.periodSlice, record) + val period = calculateNumberOfPeriodsFromTimestamp(firstTimestamp, backupConfig.timeConfiguration, record) ((record, period), context) }) @@ -247,6 +271,30 @@ trait BackupClientInterface[T <: KafkaClientInterface] { } } + /** Prepares the sink before it gets handed to `backupToStorageSink` + */ + private[backup] def prepareStartOfStream(state: Option[CurrentState], + start: Start + ): Sink[ByteStringElement, Future[BackupResult]] = + if (state.isDefined) + Flow[ByteStringElement] + .flatMapPrefix(1) { + case Seq(byteStringElement: Start) => + val withoutStartOfJsonArray = byteStringElement.data.drop(1) + Flow[ByteStringElement].prepend( + Source.single(byteStringElement.copy(data = withoutStartOfJsonArray)) + ) + case _ => throw Errors.ExpectedStartOfSource + } + .toMat(backupToStorageSink(start.key, state).contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + })(Keep.right) + else + backupToStorageSink(start.key, state) + .contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + } + /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a * data source. * @return @@ -254,11 +302,6 @@ trait BackupClientInterface[T <: KafkaClientInterface] { * the underlying stream). */ def backup: RunnableGraph[kafkaClientInterface.Control] = { - // TODO use https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3api/list-multipart-uploads.html - // and https://stackoverflow.com/questions/53764876/resume-s3-multipart-upload-partetag to find any in progress - // multiupload to resume from previous termination. Looks like we will have to do this manually since its not in - // Alpakka yet - val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord)) val split = withBackupStreamPositions @@ -275,10 +318,10 @@ trait BackupClientInterface[T <: KafkaClientInterface] { case (Seq(only: Element, End), _) => // This case only occurs when you have a single element in a timeslice. // We have to terminate immediately to create a JSON array with a single element - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) Source(transformFirstElement(only, key, terminate = true)) case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) => - val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime) + val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) val firstSource = transformFirstElement(first, key, terminate = false) val rest = Source.combine( @@ -303,33 +346,27 @@ trait BackupClientInterface[T <: KafkaClientInterface] { )(Concat(_)) case (Seq(only: Element), _) => // This case can also occur when user terminates the stream - val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration) Source(transformFirstElement(only, key, terminate = false)) case (rest, _) => throw Errors.UnhandledStreamCase(rest) } - // Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071 @nowarn("msg=method lazyInit in object Sink is deprecated") - val subFlowSink = substreams - .alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement => - byteStringElement.context - }) - .to( - // See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660 - Sink.lazyInit( - { - case start: Start => - Future.successful( - backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement => - byteStringElement.data - } - ) - case _ => throw Errors.ExpectedStartOfSource - }, - empty - ) + val subFlowSink = substreams.to( + // See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660 + Sink.lazyInit( + { + case start: Start => + implicit val ec: ExecutionContext = system.getDispatcher + for { + state <- getCurrentUploadState(start.key) + } yield prepareStartOfStream(state, start) + case _ => throw Errors.ExpectedStartOfSource + }, + empty ) + ) subFlowSink } @@ -348,8 +385,14 @@ object BackupClientInterface { * @return * A `String` that can be used either as some object key or a filename */ - def calculateKey(offsetDateTime: OffsetDateTime): String = - s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json" + def calculateKey(offsetDateTime: OffsetDateTime, timeConfiguration: TimeConfiguration): String = { + val finalTime = timeConfiguration match { + case ChronoUnitSlice(chronoUnit) => offsetDateTime.truncatedTo(chronoUnit) + case _ => offsetDateTime + } + + s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.json" + } /** Calculates whether we have rolled over a time period given number of divided periods. * @param dividedPeriodsBefore @@ -369,9 +412,16 @@ object BackupClientInterface { } protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime, - period: FiniteDuration, + timeConfiguration: TimeConfiguration, reducedConsumerRecord: ReducedConsumerRecord - ): Long = + ): Long = { + val (period, finalInitialTime) = timeConfiguration match { + case PeriodFromFirst(duration) => (duration, initialTime) + case ChronoUnitSlice(chronoUnit) => + (chronoUnit.getDuration.toScala, initialTime.truncatedTo(chronoUnit)) + } + // TODO handle overflow? - ChronoUnit.MICROS.between(initialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros + ChronoUnit.MICROS.between(finalInitialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros + } } diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala index b070fb3a..0fcddd50 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/Backup.scala @@ -1,8 +1,6 @@ package io.aiven.guardian.kafka.backup.configs -import scala.concurrent.duration.FiniteDuration - -/** @param periodSlice - * The time period for each given slice that stores all of the `ReducedConsumerRecord` +/** @param timeConfiguration + * Determines how the backed up objects/files are segregated depending on a time configuration */ -final case class Backup(periodSlice: FiniteDuration) +final case class Backup(timeConfiguration: TimeConfiguration) diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/TimeConfiguration.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/TimeConfiguration.scala new file mode 100644 index 00000000..ce68e44f --- /dev/null +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/configs/TimeConfiguration.scala @@ -0,0 +1,23 @@ +package io.aiven.guardian.kafka.backup.configs + +import scala.concurrent.duration.Duration + +import java.time.temporal.ChronoUnit + +sealed trait TimeConfiguration + +/** Backs up objects/files depending on the timestamp fo the first received Kafka message. Suspending/resuming the + * backup client will always create a new object/file + * @param duration + * The maximum span of time for each object/file, when this duration is exceeded a new file is created + */ +final case class PeriodFromFirst(duration: Duration) extends TimeConfiguration + +/** Backs up objects/files by collecting received Kafka messages into a single time slice based on a [[ChronoUnit]]. + * When suspending/resuming the backup client, this option will reuse existing objects/keys if they fall into the + * currently configured `chronoUnit`. + * @param chronoUnit + * Timestamps for kafka messages that are contained within the configured [[ChronoUnit]] will be placed into the same + * object/file. + */ +final case class ChronoUnitSlice(chronoUnit: ChronoUnit) extends TimeConfiguration diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala index 9f8c6eb4..b409c485 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala @@ -8,6 +8,7 @@ import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen +import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import org.apache.kafka.common.record.TimestampType @@ -41,7 +42,7 @@ class BackupClientInterfaceSpec forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) val calculatedFuture = mock.materializeBackupStreamPositions() @@ -58,7 +59,7 @@ class BackupClientInterfaceSpec forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) val result = mock.materializeBackupStreamPositions().futureValue.toList @@ -93,7 +94,7 @@ class BackupClientInterfaceSpec forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) val result = mock.materializeBackupStreamPositions().futureValue.toList @@ -118,7 +119,7 @@ class BackupClientInterfaceSpec (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) mock.clear() @@ -161,7 +162,7 @@ class BackupClientInterfaceSpec (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) mock.clear() @@ -199,7 +200,7 @@ class BackupClientInterfaceSpec val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source.single( reducedConsumerRecord ), - 1 day + PeriodFromFirst(1 day) ) mock.clear() val calculatedFuture = for { @@ -238,7 +239,7 @@ class BackupClientInterfaceSpec val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source( reducedConsumerRecords ), - 1 millis + PeriodFromFirst(1 millis) ) mock.clear() val calculatedFuture = for { @@ -273,7 +274,7 @@ class BackupClientInterfaceSpec (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice + PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice) ) mock.clear() diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index 0e9ed4e3..dc1c72e9 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -10,11 +10,11 @@ import akka.util.ByteString import io.aiven.guardian.kafka.MockedKafkaClientInterface import io.aiven.guardian.kafka.Utils._ import io.aiven.guardian.kafka.backup.configs.Backup +import io.aiven.guardian.kafka.backup.configs.TimeConfiguration import io.aiven.guardian.kafka.models.ReducedConsumerRecord import scala.collection.immutable import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import java.time.OffsetDateTime @@ -25,8 +25,9 @@ import java.util.concurrent.ConcurrentLinkedQueue * @param periodSlice */ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafkaClientInterface, - periodSlice: FiniteDuration -) extends BackupClientInterface[MockedKafkaClientInterface] { + timeConfiguration: TimeConfiguration +)(implicit override val system: ActorSystem) + extends BackupClientInterface[MockedKafkaClientInterface] { /** The collection that receives the data as its being submitted where each value is the key along with the * `ByteString`. Use `mergeBackedUpData` to process `backedUpData` into a more convenient data structure once you @@ -72,13 +73,17 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka def clear(): Unit = backedUpData.clear() override implicit lazy val backupConfig: Backup = Backup( - periodSlice + timeConfiguration ) /** Override this type to define the result of backing up data to a datasource */ override type BackupResult = Done + override type CurrentState = Nothing + + override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) + override def empty: () => Future[Done] = () => Future.successful(Done) /** Override this method to define how to backup a `ByteString` to a `DataSource` @@ -88,9 +93,12 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka * @return * A Sink that also provides a `BackupResult` */ - override def backupToStorageSink(key: String): Sink[ByteString, Future[Done]] = Sink.foreach { byteString => - backedUpData.add((key, byteString)) - } + override def backupToStorageSink(key: String, + currentState: Option[Nothing] + ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[Done]] = + Sink.foreach { case (byteString, _) => + backedUpData.add((key, byteString)) + } def materializeBackupStreamPositions()(implicit system: ActorSystem @@ -103,5 +111,6 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka /** A `MockedBackupClientInterface` that also uses a mocked `KafkaClientInterface` */ class MockedBackupClientInterfaceWithMockedKafkaData(kafkaData: Source[ReducedConsumerRecord, NotUsed], - periodSlice: FiniteDuration -) extends MockedBackupClientInterface(new MockedKafkaClientInterface(kafkaData), periodSlice) + timeConfiguration: TimeConfiguration +)(implicit override val system: ActorSystem) + extends MockedBackupClientInterface(new MockedKafkaClientInterface(kafkaData), timeConfiguration) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 8ff83dfe..051a1ee8 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -28,11 +28,12 @@ akka.kafka.consumer = { } akka.kafka.committer = { + max-batch = 100000 max-batch = ${?AKKA_KAFKA_COMMITTER_MAX_BATCH} + max-interval = 1 hour max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL} parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM} - delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY} - when = ${?AKKA_KAFKA_COMMITTER_WHEN} + parallelism = 10000 } kafka-cluster = { diff --git a/core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala b/core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala index 017cafce..10e5afd7 100644 --- a/core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala +++ b/core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala @@ -2,9 +2,10 @@ package io.aiven.guardian.kafka import akka.Done import akka.actor.ActorSystem +import akka.kafka.CommitDelivery import akka.kafka.CommitterSettings -import akka.kafka.ConsumerMessage.Committable import akka.kafka.ConsumerMessage.CommittableOffset +import akka.kafka.ConsumerMessage.CommittableOffsetBatch import akka.kafka.ConsumerSettings import akka.kafka.Subscriptions import akka.kafka.scaladsl.Committer @@ -14,8 +15,10 @@ import akka.stream.scaladsl.SourceWithContext import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer +import scala.collection.immutable import scala.concurrent.Future import java.util.Base64 @@ -23,27 +26,39 @@ import java.util.Base64 /** A Kafka Client that uses Alpakka Kafka Consumer under the hood to create a stream of events from a Kafka cluster. To * configure the Alpakka Kafka Consumer use the standard typesafe configuration i.e. akka.kafka.consumer (note that the * `keySerializer` and `valueSerializer` are hardcoded so you cannot override this). - * @param configure + * @param configureConsumer * A way to configure the underlying Kafka consumer settings + * @param configureCommitter + * A way to configure the underlying kafka committer settings * @param system * A classic `ActorSystem` * @param kafkaClusterConfig * Additional cluster configuration that is needed */ class KafkaClient( - configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None + configureConsumer: Option[ + ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]] + ] = None, + configureCommitter: Option[ + CommitterSettings => CommitterSettings + ] = None )(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster) extends KafkaClientInterface with StrictLogging { - override type CursorContext = Committable - override type Control = Consumer.Control + override type CursorContext = CommittableOffset + override type Control = Consumer.Control + override type BatchedCursorContext = CommittableOffsetBatch if (kafkaClusterConfig.topics.isEmpty) logger.warn("Kafka Cluster configuration has no topics set") private[kafka] val consumerSettings = { val base = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer) - configure.fold(base)(block => block(base)) + configureConsumer + .fold(base)(block => block(base)) + .withProperties( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" + ) } private[kafka] val subscriptions = Subscriptions.topics(kafkaClusterConfig.topics) @@ -51,7 +66,7 @@ class KafkaClient( /** @return * A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors */ - override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = + override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] = Consumer .sourceWithOffsetContext(consumerSettings, subscriptions) .map(consumerRecord => @@ -65,10 +80,24 @@ class KafkaClient( ) ) - private[kafka] val committerSettings: CommitterSettings = CommitterSettings(system) + private[kafka] val committerSettings: CommitterSettings = { + val base = CommitterSettings(system) + configureCommitter + .fold(base)(block => block(base)) + .withDelivery(CommitDelivery.waitForAck) + } /** @return * A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message */ - override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings) + override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings) + + /** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext` + * @param cursors + * The cursors that need to be batched + * @return + * A collection data structure that represents the batched cursors + */ + override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch = + CommittableOffsetBatch(cursors.toSeq) } diff --git a/core/src/main/scala/io/aiven/guardian/kafka/KafkaClientInterface.scala b/core/src/main/scala/io/aiven/guardian/kafka/KafkaClientInterface.scala index b6abf95c..6c4e4916 100644 --- a/core/src/main/scala/io/aiven/guardian/kafka/KafkaClientInterface.scala +++ b/core/src/main/scala/io/aiven/guardian/kafka/KafkaClientInterface.scala @@ -5,6 +5,7 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.SourceWithContext import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import scala.collection.immutable import scala.concurrent.Future trait KafkaClientInterface { @@ -18,6 +19,10 @@ trait KafkaClientInterface { */ type Control + /** The type that represents the result of batching a `CursorContext` + */ + type BatchedCursorContext + /** @return * A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors */ @@ -26,5 +31,13 @@ trait KafkaClientInterface { /** @return * A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message */ - def commitCursor: Sink[CursorContext, Future[Done]] + def commitCursor: Sink[BatchedCursorContext, Future[Done]] + + /** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext` + * @param cursors + * The cursors that need to be batched + * @return + * A collection data structure that represents the batched cursors + */ + def batchCursorContext(cursors: immutable.Iterable[CursorContext]): BatchedCursorContext } diff --git a/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala b/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala index 6b716d27..0d40f477 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/Generators.scala @@ -96,6 +96,24 @@ object Generators { final case class KafkaDataWithTimePeriod(data: List[ReducedConsumerRecord], periodSlice: FiniteDuration) + def randomPeriodSliceBetweenMinMax(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] = { + val head = reducedConsumerRecords.head + val last = reducedConsumerRecords.last + Gen.choose[Long](head.timestamp, last.timestamp - 1).map(millis => FiniteDuration(millis, MILLISECONDS)) + } + + def kafkaDateGen(min: Int = 2, + max: Int = 100, + padTimestampsMillis: Int = 10, + condition: Option[List[ReducedConsumerRecord] => Boolean] = None + ): Gen[List[ReducedConsumerRecord]] = for { + topic <- kafkaTopic + records <- { + val base = Generators.kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis) + condition.fold(base)(c => Gen.listOfFillCond(c, base)) + } + } yield records + /** Creates a generated dataset of Kafka events along with a time slice period using sensible values * @param min * The minimum number of `ReducedConsumerRecord`'s to generate. Defaults to 2. @@ -107,16 +125,41 @@ object Generators { */ def kafkaDataWithTimePeriodsGen(min: Int = 2, max: Int = 100, - padTimestampsMillis: Int = 10 + padTimestampsMillis: Int = 10, + periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] = + randomPeriodSliceBetweenMinMax, + condition: Option[List[ReducedConsumerRecord] => Boolean] = None ): Gen[KafkaDataWithTimePeriod] = for { - topic <- kafkaTopic - records <- Generators.kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis) - head = records.head - last = records.last - - duration <- Gen.choose[Long](head.timestamp, last.timestamp - 1).map(millis => FiniteDuration(millis, MILLISECONDS)) + records <- kafkaDateGen(min, max, padTimestampsMillis, condition) + duration <- periodSliceFunction(records) } yield KafkaDataWithTimePeriod(records, duration) + def reducedConsumerRecordsUntilSize(size: Long, toBytesFunc: List[ReducedConsumerRecord] => Array[Byte])( + reducedConsumerRecords: List[ReducedConsumerRecord] + ): Boolean = + toBytesFunc(reducedConsumerRecords).length > size + + def timePeriodAlwaysGreaterThanAllMessages(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] = + FiniteDuration(reducedConsumerRecords.last.timestamp + 1, MILLISECONDS) + + final case class KafkaDataInChunksWithTimePeriod(data: List[List[ReducedConsumerRecord]], periodSlice: FiniteDuration) + + /** @param size + * The minimum number of bytes + * @return + * A list of [[ReducedConsumerRecord]] that is at least as big as `size`. + */ + def kafkaDataWithMinSizeGen(size: Long, + amount: Int, + toBytesFunc: List[ReducedConsumerRecord] => Array[Byte] + ): Gen[KafkaDataInChunksWithTimePeriod] = { + val single = kafkaDateGen(1000, 10000, 10, Some(reducedConsumerRecordsUntilSize(size, toBytesFunc))) + for { + recordsSplitBySize <- Gen.sequence(List.fill(amount)(single)).map(_.asScala.toList) + duration <- timePeriodAlwaysGreaterThanAllMessages(recordsSplitBySize.flatten) + } yield KafkaDataInChunksWithTimePeriod(recordsSplitBySize, duration) + } + /** Generator for a valid Kafka topic that can be used in actual Kafka clusters */ lazy val kafkaTopic: Gen[String] = for { diff --git a/core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala b/core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala index 5ed9b743..19b7456c 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/MockedKafkaClientInterface.scala @@ -7,6 +7,7 @@ import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceWithContext import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import scala.collection.immutable import scala.concurrent.Future import scala.jdk.CollectionConverters._ @@ -34,6 +35,10 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse */ override type Control = Future[NotUsed] + /** The type that represents the result of batching a `CursorContext` + */ + override type BatchedCursorContext = Long + /** @return * A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors */ @@ -49,4 +54,12 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse */ override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor)) + /** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext` + * @param cursors + * The cursors that need to be batched + * @return + * A collection data structure that represents the batched cursors + */ + override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max + } diff --git a/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala b/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala index 3371c5ca..d5daf353 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala @@ -1,11 +1,16 @@ package io.aiven.guardian.kafka +import akka.actor.ActorSystem import org.apache.kafka.common.KafkaFuture import scala.collection.immutable import scala.collection.mutable import scala.collection.mutable.ListBuffer +import scala.concurrent.Future +import scala.jdk.DurationConverters._ +import java.time.OffsetDateTime +import java.time.temporal.ChronoUnit import java.util.concurrent.CompletableFuture object Utils { @@ -42,4 +47,38 @@ object Utils { } } + final case class UnsupportedTimeUnit(chronoUnit: ChronoUnit) extends Exception(s"$chronoUnit not supported") + + private def recurseUntilHitTimeUnit(previousChronoUnit: ChronoUnit, buffer: BigDecimal)(implicit + system: ActorSystem + ): Future[Unit] = { + val now = OffsetDateTime.now() + val (current, max) = previousChronoUnit match { + case ChronoUnit.SECONDS => + (now.getSecond, 59) + case ChronoUnit.MINUTES => + (now.getMinute, 59) + case ChronoUnit.HOURS => + (now.getHour, 23) + case ChronoUnit.DAYS => + (now.getDayOfWeek.getValue - 1, 6) + case ChronoUnit.MONTHS => + (now.getMonth.getValue - 1, 11) + case _ => throw UnsupportedTimeUnit(previousChronoUnit) + } + + if (BigDecimal(current) / BigDecimal(max) * BigDecimal(100) <= buffer) + Future.successful(()) + else + akka.pattern.after(previousChronoUnit.getDuration.toScala)(recurseUntilHitTimeUnit(previousChronoUnit, buffer)) + } + + def waitForStartOfTimeUnit(chronoUnit: ChronoUnit, buffer: BigDecimal = BigDecimal(5))(implicit + system: ActorSystem + ): Future[Unit] = { + val allEnums = ChronoUnit.values() + val previousEnum = allEnums(chronoUnit.ordinal - 1) + recurseUntilHitTimeUnit(previousEnum, buffer) + } + }