Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add suspend and resume functionality
Browse files Browse the repository at this point in the history
mdedetrich committed Jan 13, 2022

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 4b9593f commit 441fd83
Showing 19 changed files with 890 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.guardian.kafka.backup.gcs

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.stream.alpakka.google.GoogleAttributes
import akka.stream.alpakka.google.GoogleSettings
@@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
gcsConfig: GCSConfig
) extends BackupClientInterface[T] {

override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)

override type BackupResult = Option[StorageObject]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
override type CurrentState = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
byteString
}
}

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,168 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.alpakka.s3.UploadPartResponse
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import java.time.Instant

final case class CurrentS3State(uploadId: String, parts: Seq[Part])

class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClientInterface[T] {
) extends BackupClientInterface[T]
with StrictLogging {

override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)

override type BackupResult = Option[MultipartUploadResult]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
val base = S3
.multipartUploadWithHeaders(
s3Config.dataBucket,
key,
s3Headers = s3Headers,
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
override type CurrentState = CurrentS3State

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

for {
incompleteUploads <-
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.minBy(_.initiated)(Ordering[Instant].reverse)
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result

}

private[s3] def failureSink
: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
logger.warn(
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
failedPart.exception
)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
}

private[s3] def successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
kafkaClientInterface.commitCursor
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
kafkaClientInterface.batchCursorContext(cursors)
}

private[s3] def kafkaBatchSink
: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {

val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.wireTap { data =>
val (part, _) = data
logger.info(
s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}"
)
}
.toMat(successSink)(Keep.none)

val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.toMat(failureSink)(Keep.none)

Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {

// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
// we can't have concurrent updates to this data structure.

val sink = currentState match {
case Some(state) =>
logger.info(
s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key"
)
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
state.uploadId,
state.parts,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
case None =>
logger.info(
s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key"
)
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
}

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.scaladsl.Sink
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.Future

import java.util.concurrent.ConcurrentLinkedQueue

class BackupClientChunkState[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClient[T](maybeS3Settings) {
val processedChunks: ConcurrentLinkedQueue[SuccessfulUploadPart] = new ConcurrentLinkedQueue[SuccessfulUploadPart]()

override val successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
super.successSink.contramap { case (part, value) =>
processedChunks.add(part)
(part, value)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.Scheduler
import akka.stream.Attributes
import akka.stream.alpakka.s3.BucketAccess
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
@@ -14,6 +16,7 @@ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.akka.AkkaHttpTestKit
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Config
@@ -49,7 +52,9 @@ trait BackupClientSpec
with StrictLogging {

implicit val ec: ExecutionContext = system.dispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)

val ThrottleElements: Int = 100
val ThrottleAmount: FiniteDuration = 1 millis
@@ -128,13 +133,41 @@ trait BackupClientSpec
case None => ()
}

/** @param dataBucket
* Which S3 bucket the objects are being persisted into
* @param transformResult
* A function that transforms the download result from S3 into the data `T` that you need. Note that you can also
* throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate)
* @param attempts
* Total number of attempts
* @param delay
* The delay between each attempt after the first
* @tparam T
* Type of the final result transformed by `transformResult`
* @return
*/
def waitForS3Download[T](dataBucket: String,
transformResult: Seq[ListBucketResultContents] => T,
attempts: Int = 10,
delay: FiniteDuration = 1 second
): Future[T] = {
implicit val scheduler: Scheduler = system.scheduler

val attempt = () =>
S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map {
transformResult
}

akka.pattern.retry(attempt, attempts, delay)
}

property("backup method completes flow correctly for all valid Kafka events") {
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val backupClient = new MockedS3BackupClientInterface(
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
kafkaDataWithTimePeriod.periodSlice,
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice),
s3Config,
Some(s3Settings)
)
@@ -171,7 +204,7 @@ trait BackupClientSpec
})
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
source
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.toMat(Sink.collection)(Keep.right)
.run()
.map(list => (key, list.flatten))
@@ -181,7 +214,9 @@ trait BackupClientSpec
OffsetDateTime.parse(date).toEpochSecond
}(Ordering[Long].reverse)
flattened = sorted.flatMap { case (_, records) => records }
} yield flattened
} yield flattened.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
val observed = calculatedFuture.futureValue

kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.KafkaClient
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

class KafkaClientWithKillSwitch(
configureConsumer: Option[
ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]
] = None,
configureCommitter: Option[
CommitterSettings => CommitterSettings
] = None,
killSwitch: SharedKillSwitch
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
extends KafkaClient(configureConsumer, configureCommitter) {
override def getSource
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
super.getSource.via(killSwitch.flow)
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
package io.aiven.guardian.kafka.backup.s3

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.scaladsl.Source
import io.aiven.guardian.kafka.MockedKafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.concurrent.duration.FiniteDuration

class MockedS3BackupClientInterface(
kafkaData: Source[ReducedConsumerRecord, NotUsed],
periodSlice: FiniteDuration,
timeConfiguration: TimeConfiguration,
s3Config: S3Config,
maybeS3Settings: Option[S3Settings]
)(implicit val s3Headers: S3Headers)
)(implicit val s3Headers: S3Headers, system: ActorSystem)
extends BackupClient(maybeS3Settings)(new MockedKafkaClientInterface(kafkaData),
Backup(periodSlice),
Backup(timeConfiguration),
implicitly,
s3Config,
implicitly
)
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.kafka.ConsumerSettings
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.KillSwitches
import akka.stream.SharedKillSwitch
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.Generators.KafkaDataInChunksWithTimePeriod
import io.aiven.guardian.kafka.Generators.kafkaDataWithMinSizeGen
import io.aiven.guardian.kafka.KafkaClient
import io.aiven.guardian.kafka.KafkaClusterTest
import io.aiven.guardian.kafka.Utils._
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.mdedetrich.akka.stream.support.CirceStreamSupport

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.language.postfixOps

class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with BackupClientSpec {
import java.time.temporal.ChronoUnit
import java.util.Base64

class RealS3BackupClientSpec
extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec"))
with BackupClientSpec
with KafkaClusterTest {
override lazy val s3Settings: S3Settings = S3Settings()

/** Virtual Dot Host in bucket names are disabled because you need an actual DNS certificate otherwise AWS will fail
@@ -16,4 +55,289 @@ class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupCli
override lazy val useVirtualDotHost: Boolean = false
override lazy val bucketPrefix: Option[String] = Some("guardian-")
override lazy val enableCleanup: Option[FiniteDuration] = Some(5 seconds)

/** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster
*/
val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds)

case object TerminationException extends Exception("termination-exception")

def reducedConsumerRecordsToJson(reducedConsumerRecords: List[ReducedConsumerRecord]): Array[Byte] = {
import io.aiven.guardian.kafka.codecs.Circe._
import io.circe.syntax._
reducedConsumerRecords.asJson.noSpaces.getBytes
}

def baseKafkaConfig: Some[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] =
Some(
_.withBootstrapServers(
container.bootstrapServers
).withGroupId("test-group")
)

def createKafkaClient(
killSwitch: SharedKillSwitch
)(implicit kafkaClusterConfig: KafkaCluster): KafkaClientWithKillSwitch =
new KafkaClientWithKillSwitch(
configureConsumer = baseKafkaConfig,
killSwitch = killSwitch
)

/** Converts a generated list of `ReducedConsumerRecord` to a list of `ProducerRecord`
* @param data
* List of `ReducedConsumerRecord`'s generated by scalacheck
* @return
* A list of `ProducerRecord`. Note that it only uses the `key`/`value` and ignores other values
*/
def toProducerRecords(data: List[ReducedConsumerRecord]): List[ProducerRecord[Array[Byte], Array[Byte]]] = data.map {
reducedConsumerRecord =>
val keyAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.key)
val valueAsByteArray = Base64.getDecoder.decode(reducedConsumerRecord.value)
new ProducerRecord[Array[Byte], Array[Byte]](reducedConsumerRecord.topic, keyAsByteArray, valueAsByteArray)
}

/** Converts a list of `ProducerRecord` to a source that is streamed over a period of time
* @param producerRecords
* The list of producer records
* @param streamDuration
* The period over which the topics will be streamed
* @return
* Source ready to be passed into a Kafka producer
*/
def toSource(
producerRecords: List[ProducerRecord[Array[Byte], Array[Byte]]],
streamDuration: FiniteDuration
): Source[ProducerRecord[Array[Byte], Array[Byte]], NotUsed] = {
val durationToMicros = streamDuration.toMillis
val topicsPerMillis = producerRecords.size / durationToMicros
Source(producerRecords).throttle(topicsPerMillis.toInt, 1 millis)
}

/** Call this function to send a message after the next step of configured time period to trigger a rollover so the
* current object will finish processing
* @param duration
* @param producerSettings
* @param topic
* @return
*/
def sendTopicAfterTimePeriod(duration: FiniteDuration,
producerSettings: ProducerSettings[Array[Byte], Array[Byte]],
topic: String
): Future[Done] = akka.pattern.after(duration) {
Source(
List(
new ProducerRecord[Array[Byte], Array[Byte]](topic, "1".getBytes, "1".getBytes)
)
).runWith(Producer.plainSink(producerSettings))
}

def createProducer(): ProducerSettings[Array[Byte], Array[Byte]] =
ProducerSettings(system, new ByteArraySerializer, new ByteArraySerializer)
.withBootstrapServers(container.bootstrapServers)

case class DownloadNotReady(downloads: Seq[ListBucketResultContents])
extends Exception(s"Download not ready, current state is ${downloads.map(_.toString).mkString(",")}")

def getKeyFromSingleDownload(dataBucket: String): Future[String] = waitForS3Download(
dataBucket,
{
case Seq(single) => single.key
case rest =>
throw DownloadNotReady(rest)
}
)

property("entire flow works properly from start to end") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(PeriodFromFirst(1 minute))

val producerSettings = createProducer()

val backupClient =
new BackupClient(Some(s3Settings))(new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ <- akka.pattern.after(KafkaInitializationTimeoutConstant)(
baseSource
.runWith(Producer.plainSink(producerSettings))
)
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <-
S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]).runWith(Sink.seq)
case None => throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val downloaded = calculatedFuture.futureValue

val downloadedGroupedAsKey = downloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

downloadedGroupedAsKey mustEqual inputAsKey
}
}

def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_],
step: FiniteDuration = 100 millis,
delay: FiniteDuration = 5 seconds
): Future[Unit] =
if (backupClient.processedChunks.size() > 0)
akka.pattern.after(delay)(Future.successful(()))
else
akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay))

property("suspend/resume works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(ChronoUnitSlice(ChronoUnit.MINUTES))

val producerSettings = createProducer()

val firstKillSwitch = KillSwitches.shared("first-kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch),
implicitly,
implicitly,
implicitly,
implicitly
)

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES)
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = firstKillSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <- S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield downloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val downloaded = calculatedFuture.futureValue

// Only care about ordering when it comes to key
val downloadedGroupedAsKey = downloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

downloadedGroupedAsKey mustEqual inputAsKey
}
}
}
20 changes: 11 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/c
val akkaVersion = "2.6.17"
val akkaHttpVersion = "10.2.7"
val alpakkaKafkaVersion = "2.1.1"
val alpakkaVersion = "3.0.2+34-bdac5519+20211013-1607-SNAPSHOT"
val alpakkaVersion = "3.0.4"
val quillJdbcMonixVersion = "3.7.2"
val postgresqlJdbcVersion = "42.2.24"
val scalaLoggingVersion = "3.9.4"
@@ -23,6 +23,7 @@ val akkaStreamsJson = "0.8.0"
val diffxVersion = "0.5.6"
val testContainersVersion = "0.39.8"
val testContainersJavaVersion = "1.16.2"
val scalaCheckVersion = "1.15.5-1-SNAPSHOT"

val flagsFor12 = Seq(
"-Xlint:_",
@@ -86,6 +87,7 @@ lazy val core = project
"com.typesafe.akka" %% "akka-stream" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"org.mdedetrich" %% "scalacheck" % scalaCheckVersion % Test,
"com.softwaremill.diffx" %% "diffx-scalatest" % diffxVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,
@@ -101,10 +103,10 @@ lazy val coreS3 = project
librarySettings,
name := s"$baseName-s3",
libraryDependencies ++= Seq(
"org.mdedetrich" %% "akka-stream-alpakka-s3" % alpakkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion % Test
)
)
.dependsOn(core % "compile->compile;test->test")
@@ -115,10 +117,10 @@ lazy val coreGCS = project
librarySettings,
name := s"$baseName-gcs",
libraryDependencies ++= Seq(
"org.mdedetrich" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test
"com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.scalatestplus" %% "scalacheck-1-15" % scalaTestScalaCheckVersion % Test,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion % Test
)
)
.dependsOn(core % "compile->compile;test->test")
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
package io.aiven.guardian.kafka.backup

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString
import io.aiven.guardian.kafka.Errors
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.circe.syntax._

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._

import java.time._
import java.time.format.DateTimeFormatter
@@ -25,6 +30,7 @@ import java.time.temporal._
trait BackupClientInterface[T <: KafkaClientInterface] {
implicit val kafkaClientInterface: T
implicit val backupConfig: Backup
implicit val system: ActorSystem

/** An element from the original record
*/
@@ -53,15 +59,33 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
*/
type BackupResult

/** Override this type to define the result of calculating the previous state (if it exists)
*/
type CurrentState

import BackupClientInterface._

/** Override this method to define how to backup a `ByteString` to a `DataSource`
/** Override this method to define how to retrieve the current state of a backup.
* @param key
* The object key or filename for what is being backed up
* @return
* An optional [[Future]] that contains the state if it found a previously aborted backup. Return [[None]] if if
* its a brand new backup.
*/
def getCurrentUploadState(key: String): Future[Option[CurrentState]]

/** Override this method to define how to backup a `ByteString` combined with Kafka
* `kafkaClientInterface.CursorContext` to a `DataSource`
* @param key
* The object key or filename for what is being backed up
* @param currentState
* The current state if it exists. This is used when resuming from a previously aborted backup
* @return
* A Sink that also provides a `BackupResult`
*/
def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]]
def backupToStorageSink(key: String,
currentState: Option[CurrentState]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]]

/** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been
* split at `BackupStreamPosition.Start`. If you have difficulties defining an empty value for `BackupResult` then
@@ -134,7 +158,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
kafkaClientInterface.CursorContext,
kafkaClientInterface.Control
] = SourceWithContext.fromTuples(source.map { case (firstTimestamp, (record, context)) =>
val period = calculateNumberOfPeriodsFromTimestamp(firstTimestamp, backupConfig.periodSlice, record)
val period = calculateNumberOfPeriodsFromTimestamp(firstTimestamp, backupConfig.timeConfiguration, record)
((record, period), context)
})

@@ -247,18 +271,37 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
}
}

/** Prepares the sink before it gets handed to `backupToStorageSink`
*/
private[backup] def prepareStartOfStream(state: Option[CurrentState],
start: Start
): Sink[ByteStringElement, Future[BackupResult]] =
if (state.isDefined)
Flow[ByteStringElement]
.flatMapPrefix(1) {
case Seq(byteStringElement: Start) =>
val withoutStartOfJsonArray = byteStringElement.data.drop(1)
Flow[ByteStringElement].prepend(
Source.single(byteStringElement.copy(data = withoutStartOfJsonArray))
)
case _ => throw Errors.ExpectedStartOfSource
}
.toMat(backupToStorageSink(start.key, state).contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
})(Keep.right)
else
backupToStorageSink(start.key, state)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
* data source.
* @return
* The `KafkaClientInterface.Control` which depends on the implementation of `T` (typically this is used to control
* the underlying stream).
*/
def backup: RunnableGraph[kafkaClientInterface.Control] = {
// TODO use https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3api/list-multipart-uploads.html
// and https://stackoverflow.com/questions/53764876/resume-s3-multipart-upload-partetag to find any in progress
// multiupload to resume from previous termination. Looks like we will have to do this manually since its not in
// Alpakka yet

val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))

val split = withBackupStreamPositions
@@ -275,10 +318,10 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
case (Seq(only: Element, End), _) =>
// This case only occurs when you have a single element in a timeslice.
// We have to terminate immediately to create a JSON array with a single element
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
Source(transformFirstElement(only, key, terminate = true))
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
val firstSource = transformFirstElement(first, key, terminate = false)

val rest = Source.combine(
@@ -303,33 +346,27 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
)(Concat(_))
case (Seq(only: Element), _) =>
// This case can also occur when user terminates the stream
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime, backupConfig.timeConfiguration)
Source(transformFirstElement(only, key, terminate = false))
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
}

// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
@nowarn("msg=method lazyInit in object Sink is deprecated")
val subFlowSink = substreams
.alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement =>
byteStringElement.context
})
.to(
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
Sink.lazyInit(
{
case start: Start =>
Future.successful(
backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement =>
byteStringElement.data
}
)
case _ => throw Errors.ExpectedStartOfSource
},
empty
)
val subFlowSink = substreams.to(
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
Sink.lazyInit(
{
case start: Start =>
implicit val ec: ExecutionContext = system.getDispatcher
for {
state <- getCurrentUploadState(start.key)
} yield prepareStartOfStream(state, start)
case _ => throw Errors.ExpectedStartOfSource
},
empty
)
)

subFlowSink
}
@@ -348,8 +385,14 @@ object BackupClientInterface {
* @return
* A `String` that can be used either as some object key or a filename
*/
def calculateKey(offsetDateTime: OffsetDateTime): String =
s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json"
def calculateKey(offsetDateTime: OffsetDateTime, timeConfiguration: TimeConfiguration): String = {
val finalTime = timeConfiguration match {
case ChronoUnitSlice(chronoUnit) => offsetDateTime.truncatedTo(chronoUnit)
case _ => offsetDateTime
}

s"${BackupClientInterface.formatOffsetDateTime(finalTime)}.json"
}

/** Calculates whether we have rolled over a time period given number of divided periods.
* @param dividedPeriodsBefore
@@ -369,9 +412,16 @@ object BackupClientInterface {
}

protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
period: FiniteDuration,
timeConfiguration: TimeConfiguration,
reducedConsumerRecord: ReducedConsumerRecord
): Long =
): Long = {
val (period, finalInitialTime) = timeConfiguration match {
case PeriodFromFirst(duration) => (duration, initialTime)
case ChronoUnitSlice(chronoUnit) =>
(chronoUnit.getDuration.toScala, initialTime.truncatedTo(chronoUnit))
}

// TODO handle overflow?
ChronoUnit.MICROS.between(initialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros
ChronoUnit.MICROS.between(finalInitialTime, reducedConsumerRecord.toOffsetDateTime) / period.toMicros
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.aiven.guardian.kafka.backup.configs

import scala.concurrent.duration.FiniteDuration

/** @param periodSlice
* The time period for each given slice that stores all of the `ReducedConsumerRecord`
/** @param timeConfiguration
* Determines how the backed up objects/files are segregated depending on a time configuration
*/
final case class Backup(periodSlice: FiniteDuration)
final case class Backup(timeConfiguration: TimeConfiguration)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.aiven.guardian.kafka.backup.configs

import scala.concurrent.duration.Duration

import java.time.temporal.ChronoUnit

sealed trait TimeConfiguration

/** Backs up objects/files depending on the timestamp fo the first received Kafka message. Suspending/resuming the
* backup client will always create a new object/file
* @param duration
* The maximum span of time for each object/file, when this duration is exceeded a new file is created
*/
final case class PeriodFromFirst(duration: Duration) extends TimeConfiguration

/** Backs up objects/files by collecting received Kafka messages into a single time slice based on a [[ChronoUnit]].
* When suspending/resuming the backup client, this option will reuse existing objects/keys if they fall into the
* currently configured `chronoUnit`.
* @param chronoUnit
* Timestamps for kafka messages that are contained within the configured [[ChronoUnit]] will be placed into the same
* object/file.
*/
final case class ChronoUnitSlice(chronoUnit: ChronoUnit) extends TimeConfiguration
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import io.aiven.guardian.akka.AkkaStreamTestKit
import io.aiven.guardian.akka.AnyPropTestKit
import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod
import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import org.apache.kafka.common.record.TimestampType
@@ -41,7 +42,7 @@ class BackupClientInterfaceSpec
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

val calculatedFuture = mock.materializeBackupStreamPositions()
@@ -58,7 +59,7 @@ class BackupClientInterfaceSpec
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

val result = mock.materializeBackupStreamPositions().futureValue.toList
@@ -93,7 +94,7 @@ class BackupClientInterfaceSpec
forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

val result = mock.materializeBackupStreamPositions().futureValue.toList
@@ -118,7 +119,7 @@ class BackupClientInterfaceSpec
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

mock.clear()
@@ -161,7 +162,7 @@ class BackupClientInterfaceSpec
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

mock.clear()
@@ -199,7 +200,7 @@ class BackupClientInterfaceSpec
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source.single(
reducedConsumerRecord
),
1 day
PeriodFromFirst(1 day)
)
mock.clear()
val calculatedFuture = for {
@@ -238,7 +239,7 @@ class BackupClientInterfaceSpec
val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(
reducedConsumerRecords
),
1 millis
PeriodFromFirst(1 millis)
)
mock.clear()
val calculatedFuture = for {
@@ -273,7 +274,7 @@ class BackupClientInterfaceSpec
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) =>
val mock =
new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data),
kafkaDataWithTimePeriod.periodSlice
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice)
)

mock.clear()
Original file line number Diff line number Diff line change
@@ -10,11 +10,11 @@ import akka.util.ByteString
import io.aiven.guardian.kafka.MockedKafkaClientInterface
import io.aiven.guardian.kafka.Utils._
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.TimeConfiguration
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import java.time.OffsetDateTime
@@ -25,8 +25,9 @@ import java.util.concurrent.ConcurrentLinkedQueue
* @param periodSlice
*/
class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafkaClientInterface,
periodSlice: FiniteDuration
) extends BackupClientInterface[MockedKafkaClientInterface] {
timeConfiguration: TimeConfiguration
)(implicit override val system: ActorSystem)
extends BackupClientInterface[MockedKafkaClientInterface] {

/** The collection that receives the data as its being submitted where each value is the key along with the
* `ByteString`. Use `mergeBackedUpData` to process `backedUpData` into a more convenient data structure once you
@@ -72,13 +73,17 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
def clear(): Unit = backedUpData.clear()

override implicit lazy val backupConfig: Backup = Backup(
periodSlice
timeConfiguration
)

/** Override this type to define the result of backing up data to a datasource
*/
override type BackupResult = Done

override type CurrentState = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)

override def empty: () => Future[Done] = () => Future.successful(Done)

/** Override this method to define how to backup a `ByteString` to a `DataSource`
@@ -88,9 +93,12 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
* @return
* A Sink that also provides a `BackupResult`
*/
override def backupToStorageSink(key: String): Sink[ByteString, Future[Done]] = Sink.foreach { byteString =>
backedUpData.add((key, byteString))
}
override def backupToStorageSink(key: String,
currentState: Option[Nothing]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[Done]] =
Sink.foreach { case (byteString, _) =>
backedUpData.add((key, byteString))
}

def materializeBackupStreamPositions()(implicit
system: ActorSystem
@@ -103,5 +111,6 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
/** A `MockedBackupClientInterface` that also uses a mocked `KafkaClientInterface`
*/
class MockedBackupClientInterfaceWithMockedKafkaData(kafkaData: Source[ReducedConsumerRecord, NotUsed],
periodSlice: FiniteDuration
) extends MockedBackupClientInterface(new MockedKafkaClientInterface(kafkaData), periodSlice)
timeConfiguration: TimeConfiguration
)(implicit override val system: ActorSystem)
extends MockedBackupClientInterface(new MockedKafkaClientInterface(kafkaData), timeConfiguration)
5 changes: 3 additions & 2 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -28,11 +28,12 @@ akka.kafka.consumer = {
}

akka.kafka.committer = {
max-batch = 100000
max-batch = ${?AKKA_KAFKA_COMMITTER_MAX_BATCH}
max-interval = 1 hour
max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL}
parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM}
delivery = ${?AKKA_KAFKA_COMMITTER_DELIVERY}
when = ${?AKKA_KAFKA_COMMITTER_WHEN}
parallelism = 10000
}

kafka-cluster = {
47 changes: 38 additions & 9 deletions core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala
Original file line number Diff line number Diff line change
@@ -2,9 +2,10 @@ package io.aiven.guardian.kafka

import akka.Done
import akka.actor.ActorSystem
import akka.kafka.CommitDelivery
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage.Committable
import akka.kafka.ConsumerMessage.CommittableOffset
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.ConsumerSettings
import akka.kafka.Subscriptions
import akka.kafka.scaladsl.Committer
@@ -14,44 +15,58 @@ import akka.stream.scaladsl.SourceWithContext
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer

import scala.collection.immutable
import scala.concurrent.Future

import java.util.Base64

/** A Kafka Client that uses Alpakka Kafka Consumer under the hood to create a stream of events from a Kafka cluster. To
* configure the Alpakka Kafka Consumer use the standard typesafe configuration i.e. akka.kafka.consumer (note that the
* `keySerializer` and `valueSerializer` are hardcoded so you cannot override this).
* @param configure
* @param configureConsumer
* A way to configure the underlying Kafka consumer settings
* @param configureCommitter
* A way to configure the underlying kafka committer settings
* @param system
* A classic `ActorSystem`
* @param kafkaClusterConfig
* Additional cluster configuration that is needed
*/
class KafkaClient(
configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None
configureConsumer: Option[
ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]
] = None,
configureCommitter: Option[
CommitterSettings => CommitterSettings
] = None
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
extends KafkaClientInterface
with StrictLogging {
override type CursorContext = Committable
override type Control = Consumer.Control
override type CursorContext = CommittableOffset
override type Control = Consumer.Control
override type BatchedCursorContext = CommittableOffsetBatch

if (kafkaClusterConfig.topics.isEmpty)
logger.warn("Kafka Cluster configuration has no topics set")

private[kafka] val consumerSettings = {
val base = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
configure.fold(base)(block => block(base))
configureConsumer
.fold(base)(block => block(base))
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
}

private[kafka] val subscriptions = Subscriptions.topics(kafkaClusterConfig.topics)

/** @return
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
override val getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
override def getSource: SourceWithContext[ReducedConsumerRecord, CommittableOffset, Consumer.Control] =
Consumer
.sourceWithOffsetContext(consumerSettings, subscriptions)
.map(consumerRecord =>
@@ -65,10 +80,24 @@ class KafkaClient(
)
)

private[kafka] val committerSettings: CommitterSettings = CommitterSettings(system)
private[kafka] val committerSettings: CommitterSettings = {
val base = CommitterSettings(system)
configureCommitter
.fold(base)(block => block(base))
.withDelivery(CommitDelivery.waitForAck)
}

/** @return
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
override val commitCursor: Sink[Committable, Future[Done]] = Committer.sink(committerSettings)
override def commitCursor: Sink[CommittableOffsetBatch, Future[Done]] = Committer.sink(committerSettings)

/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
* @param cursors
* The cursors that need to be batched
* @return
* A collection data structure that represents the batched cursors
*/
override def batchCursorContext(cursors: immutable.Iterable[CommittableOffset]): CommittableOffsetBatch =
CommittableOffsetBatch(cursors.toSeq)
}
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

import scala.collection.immutable
import scala.concurrent.Future

trait KafkaClientInterface {
@@ -18,6 +19,10 @@ trait KafkaClientInterface {
*/
type Control

/** The type that represents the result of batching a `CursorContext`
*/
type BatchedCursorContext

/** @return
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
@@ -26,5 +31,13 @@ trait KafkaClientInterface {
/** @return
* A `Sink` that allows you to commit a `CursorContext` to Kafka to signify you have processed a message
*/
def commitCursor: Sink[CursorContext, Future[Done]]
def commitCursor: Sink[BatchedCursorContext, Future[Done]]

/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
* @param cursors
* The cursors that need to be batched
* @return
* A collection data structure that represents the batched cursors
*/
def batchCursorContext(cursors: immutable.Iterable[CursorContext]): BatchedCursorContext
}
57 changes: 50 additions & 7 deletions core/src/test/scala/io/aiven/guardian/kafka/Generators.scala
Original file line number Diff line number Diff line change
@@ -96,6 +96,24 @@ object Generators {

final case class KafkaDataWithTimePeriod(data: List[ReducedConsumerRecord], periodSlice: FiniteDuration)

def randomPeriodSliceBetweenMinMax(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] = {
val head = reducedConsumerRecords.head
val last = reducedConsumerRecords.last
Gen.choose[Long](head.timestamp, last.timestamp - 1).map(millis => FiniteDuration(millis, MILLISECONDS))
}

def kafkaDateGen(min: Int = 2,
max: Int = 100,
padTimestampsMillis: Int = 10,
condition: Option[List[ReducedConsumerRecord] => Boolean] = None
): Gen[List[ReducedConsumerRecord]] = for {
topic <- kafkaTopic
records <- {
val base = Generators.kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
condition.fold(base)(c => Gen.listOfFillCond(c, base))
}
} yield records

/** Creates a generated dataset of Kafka events along with a time slice period using sensible values
* @param min
* The minimum number of `ReducedConsumerRecord`'s to generate. Defaults to 2.
@@ -107,16 +125,41 @@ object Generators {
*/
def kafkaDataWithTimePeriodsGen(min: Int = 2,
max: Int = 100,
padTimestampsMillis: Int = 10
padTimestampsMillis: Int = 10,
periodSliceFunction: List[ReducedConsumerRecord] => Gen[FiniteDuration] =
randomPeriodSliceBetweenMinMax,
condition: Option[List[ReducedConsumerRecord] => Boolean] = None
): Gen[KafkaDataWithTimePeriod] = for {
topic <- kafkaTopic
records <- Generators.kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
head = records.head
last = records.last

duration <- Gen.choose[Long](head.timestamp, last.timestamp - 1).map(millis => FiniteDuration(millis, MILLISECONDS))
records <- kafkaDateGen(min, max, padTimestampsMillis, condition)
duration <- periodSliceFunction(records)
} yield KafkaDataWithTimePeriod(records, duration)

def reducedConsumerRecordsUntilSize(size: Long, toBytesFunc: List[ReducedConsumerRecord] => Array[Byte])(
reducedConsumerRecords: List[ReducedConsumerRecord]
): Boolean =
toBytesFunc(reducedConsumerRecords).length > size

def timePeriodAlwaysGreaterThanAllMessages(reducedConsumerRecords: List[ReducedConsumerRecord]): Gen[FiniteDuration] =
FiniteDuration(reducedConsumerRecords.last.timestamp + 1, MILLISECONDS)

final case class KafkaDataInChunksWithTimePeriod(data: List[List[ReducedConsumerRecord]], periodSlice: FiniteDuration)

/** @param size
* The minimum number of bytes
* @return
* A list of [[ReducedConsumerRecord]] that is at least as big as `size`.
*/
def kafkaDataWithMinSizeGen(size: Long,
amount: Int,
toBytesFunc: List[ReducedConsumerRecord] => Array[Byte]
): Gen[KafkaDataInChunksWithTimePeriod] = {
val single = kafkaDateGen(1000, 10000, 10, Some(reducedConsumerRecordsUntilSize(size, toBytesFunc)))
for {
recordsSplitBySize <- Gen.sequence(List.fill(amount)(single)).map(_.asScala.toList)
duration <- timePeriodAlwaysGreaterThanAllMessages(recordsSplitBySize.flatten)
} yield KafkaDataInChunksWithTimePeriod(recordsSplitBySize, duration)
}

/** Generator for a valid Kafka topic that can be used in actual Kafka clusters
*/
lazy val kafkaTopic: Gen[String] = for {
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

import scala.collection.immutable
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

@@ -34,6 +35,10 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse
*/
override type Control = Future[NotUsed]

/** The type that represents the result of batching a `CursorContext`
*/
override type BatchedCursorContext = Long

/** @return
* A `SourceWithContext` that returns a Kafka Stream which automatically handles committing of cursors
*/
@@ -49,4 +54,12 @@ class MockedKafkaClientInterface(kafkaData: Source[ReducedConsumerRecord, NotUse
*/
override def commitCursor: Sink[Long, Future[Done]] = Sink.foreach(cursor => committedOffsets ++ Iterable(cursor))

/** How to batch an immutable iterable of `CursorContext` into a `BatchedCursorContext`
* @param cursors
* The cursors that need to be batched
* @return
* A collection data structure that represents the batched cursors
*/
override def batchCursorContext(cursors: immutable.Iterable[Long]): Long = cursors.max

}
39 changes: 39 additions & 0 deletions core/src/test/scala/io/aiven/guardian/kafka/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.aiven.guardian.kafka

import akka.actor.ActorSystem
import org.apache.kafka.common.KafkaFuture

import scala.collection.immutable
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.jdk.DurationConverters._

import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.CompletableFuture

object Utils {
@@ -42,4 +47,38 @@ object Utils {
}
}

final case class UnsupportedTimeUnit(chronoUnit: ChronoUnit) extends Exception(s"$chronoUnit not supported")

private def recurseUntilHitTimeUnit(previousChronoUnit: ChronoUnit, buffer: BigDecimal)(implicit
system: ActorSystem
): Future[Unit] = {
val now = OffsetDateTime.now()
val (current, max) = previousChronoUnit match {
case ChronoUnit.SECONDS =>
(now.getSecond, 59)
case ChronoUnit.MINUTES =>
(now.getMinute, 59)
case ChronoUnit.HOURS =>
(now.getHour, 23)
case ChronoUnit.DAYS =>
(now.getDayOfWeek.getValue - 1, 6)
case ChronoUnit.MONTHS =>
(now.getMonth.getValue - 1, 11)
case _ => throw UnsupportedTimeUnit(previousChronoUnit)
}

if (BigDecimal(current) / BigDecimal(max) * BigDecimal(100) <= buffer)
Future.successful(())
else
akka.pattern.after(previousChronoUnit.getDuration.toScala)(recurseUntilHitTimeUnit(previousChronoUnit, buffer))
}

def waitForStartOfTimeUnit(chronoUnit: ChronoUnit, buffer: BigDecimal = BigDecimal(5))(implicit
system: ActorSystem
): Future[Unit] = {
val allEnums = ChronoUnit.values()
val previousEnum = allEnums(chronoUnit.ordinal - 1)
recurseUntilHitTimeUnit(previousEnum, buffer)
}

}

0 comments on commit 441fd83

Please sign in to comment.