Skip to content

Commit

Permalink
Add suspend and resume functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 13, 2022
1 parent 4b9593f commit 6210752
Show file tree
Hide file tree
Showing 19 changed files with 890 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.guardian.kafka.backup.gcs

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentTypes
import akka.stream.alpakka.google.GoogleAttributes
import akka.stream.alpakka.google.GoogleSettings
Expand All @@ -15,22 +16,35 @@ import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig}
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

// TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in
// Alpakka to allows us to commit Kafka cursor whenever chunks are uploaded
class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
gcsConfig: GCSConfig
) extends BackupClientInterface[T] {

override def empty: () => Future[Option[StorageObject]] = () => Future.successful(None)

override type BackupResult = Option[StorageObject]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
override type CurrentState = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, key, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
.contramap[(ByteString, kafkaClientInterface.CursorContext)] { case (byteString, _) =>
byteString
}
}

}
Original file line number Diff line number Diff line change
@@ -1,40 +1,168 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.alpakka.s3.UploadPartResponse
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.BackupClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import java.time.Instant

final case class CurrentS3State(uploadId: String, parts: Seq[Part])

class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClientInterface[T] {
) extends BackupClientInterface[T]
with StrictLogging {

override def empty: () => Future[Option[MultipartUploadResult]] = () => Future.successful(None)

override type BackupResult = Option[MultipartUploadResult]

override def backupToStorageSink(key: String): Sink[ByteString, Future[BackupResult]] = {
val base = S3
.multipartUploadWithHeaders(
s3Config.dataBucket,
key,
s3Headers = s3Headers,
chunkingParallelism = 1 // Parallelism is pointless for this usecase since we are directly streaming from Kafka
override type CurrentState = CurrentS3State

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

for {
incompleteUploads <-
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.minBy(_.initiated)(Ordering[Instant].reverse)
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result

}

private[s3] def failureSink
: Sink[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] = Sink
.foreach[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (failedPart, _) =>
logger.warn(
s"Failed to upload a chunk into S3 with bucket: ${failedPart.multipartUpload.bucket}, key: ${failedPart.multipartUpload.key}, uploadId: ${failedPart.multipartUpload.uploadId} and partNumber: ${failedPart.partNumber}",
failedPart.exception
)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
}

private[s3] def successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
kafkaClientInterface.commitCursor
.contramap[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])] { case (_, cursors) =>
kafkaClientInterface.batchCursorContext(cursors)
}

private[s3] def kafkaBatchSink
: Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = {

val success = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.wireTap { data =>
val (part, _) = data
logger.info(
s"Committing kafka cursor for uploadId:${part.multipartUpload.uploadId} key: ${part.multipartUpload.key} and S3 part: ${part.partNumber}"
)
}
.toMat(successSink)(Keep.none)

val failure = Flow[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])]
.collectType[(FailedUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext])]
.toMat(failureSink)(Keep.none)

Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {

// Note that chunkingParallelism is pointless for this usecase since we are directly streaming from Kafka.
// Furthermore the real `KafkaClient` implementation uses `CommittableOffsetBatch` which is a global singleton so
// we can't have concurrent updates to this data structure.

val sink = currentState match {
case Some(state) =>
logger.info(
s"Resuming previous upload with uploadId: ${state.uploadId} and bucket: ${s3Config.dataBucket} with key: $key"
)
S3.resumeMultipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
state.uploadId,
state.parts,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
case None =>
logger.info(
s"Creating new upload with bucket: ${s3Config.dataBucket} and key: $key"
)
S3.multipartUploadWithHeadersAndContext[kafkaClientInterface.CursorContext](
s3Config.dataBucket,
key,
kafkaBatchSink,
s3Headers = s3Headers,
chunkingParallelism = 1
)
}

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.aiven.guardian.kafka.backup.s3

import akka.Done
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.S3Headers
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.SuccessfulUploadPart
import akka.stream.scaladsl.Sink
import io.aiven.guardian.kafka.KafkaClientInterface
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config}

import scala.collection.immutable
import scala.concurrent.Future

import java.util.concurrent.ConcurrentLinkedQueue

class BackupClientChunkState[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings])(implicit
override val kafkaClientInterface: T,
override val backupConfig: Backup,
override val system: ActorSystem,
s3Config: S3Config,
s3Headers: S3Headers
) extends BackupClient[T](maybeS3Settings) {
val processedChunks: ConcurrentLinkedQueue[SuccessfulUploadPart] = new ConcurrentLinkedQueue[SuccessfulUploadPart]()

override val successSink
: Sink[(SuccessfulUploadPart, immutable.Iterable[kafkaClientInterface.CursorContext]), Future[Done]] =
super.successSink.contramap { case (part, value) =>
processedChunks.add(part)
(part, value)
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.Scheduler
import akka.stream.Attributes
import akka.stream.alpakka.s3.BucketAccess
import akka.stream.alpakka.s3.ListBucketResultContents
import akka.stream.alpakka.s3.S3Attributes
import akka.stream.alpakka.s3.S3Settings
import akka.stream.alpakka.s3.scaladsl.S3
Expand All @@ -14,6 +16,7 @@ import com.softwaremill.diffx.scalatest.DiffMatcher.matchTo
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.akka.AkkaHttpTestKit
import io.aiven.guardian.kafka.Generators._
import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst
import io.aiven.guardian.kafka.codecs.Circe._
import io.aiven.guardian.kafka.models.ReducedConsumerRecord
import io.aiven.guardian.kafka.s3.Config
Expand Down Expand Up @@ -49,7 +52,9 @@ trait BackupClientSpec
with StrictLogging {

implicit val ec: ExecutionContext = system.dispatcher
implicit val defaultPatience: PatienceConfig = PatienceConfig(90 seconds, 100 millis)
implicit val defaultPatience: PatienceConfig = PatienceConfig(5 minutes, 100 millis)
implicit override val generatorDrivenConfig: PropertyCheckConfiguration =
PropertyCheckConfiguration(minSuccessful = 1, minSize = 1)

val ThrottleElements: Int = 100
val ThrottleAmount: FiniteDuration = 1 millis
Expand Down Expand Up @@ -128,13 +133,41 @@ trait BackupClientSpec
case None => ()
}

/** @param dataBucket
* Which S3 bucket the objects are being persisted into
* @param transformResult
* A function that transforms the download result from S3 into the data `T` that you need. Note that you can also
* throw an exception in this transform function to trigger a retry (i.e. using it as a an additional predicate)
* @param attempts
* Total number of attempts
* @param delay
* The delay between each attempt after the first
* @tparam T
* Type of the final result transformed by `transformResult`
* @return
*/
def waitForS3Download[T](dataBucket: String,
transformResult: Seq[ListBucketResultContents] => T,
attempts: Int = 10,
delay: FiniteDuration = 1 second
): Future[T] = {
implicit val scheduler: Scheduler = system.scheduler

val attempt = () =>
S3.listBucket(dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq).map {
transformResult
}

akka.pattern.retry(attempt, attempts, delay)
}

property("backup method completes flow correctly for all valid Kafka events") {
forAll(kafkaDataWithTimePeriodsGen(), s3ConfigGen(useVirtualDotHost, bucketPrefix)) {
(kafkaDataWithTimePeriod: KafkaDataWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")
val backupClient = new MockedS3BackupClientInterface(
Source(kafkaDataWithTimePeriod.data).throttle(ThrottleElements, ThrottleAmount),
kafkaDataWithTimePeriod.periodSlice,
PeriodFromFirst(kafkaDataWithTimePeriod.periodSlice),
s3Config,
Some(s3Settings)
)
Expand Down Expand Up @@ -171,7 +204,7 @@ trait BackupClientSpec
})
keysWithRecords <- Future.sequence(keysWithSource.map { case (key, source) =>
source
.via(CirceStreamSupport.decode[List[ReducedConsumerRecord]])
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.toMat(Sink.collection)(Keep.right)
.run()
.map(list => (key, list.flatten))
Expand All @@ -181,7 +214,9 @@ trait BackupClientSpec
OffsetDateTime.parse(date).toEpochSecond
}(Ordering[Long].reverse)
flattened = sorted.flatMap { case (_, records) => records }
} yield flattened
} yield flattened.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
val observed = calculatedFuture.futureValue

kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.aiven.guardian.kafka.backup.s3

import akka.actor.ActorSystem
import akka.kafka.CommitterSettings
import akka.kafka.ConsumerMessage
import akka.kafka.ConsumerSettings
import akka.kafka.scaladsl.Consumer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.SourceWithContext
import io.aiven.guardian.kafka.KafkaClient
import io.aiven.guardian.kafka.configs.KafkaCluster
import io.aiven.guardian.kafka.models.ReducedConsumerRecord

class KafkaClientWithKillSwitch(
configureConsumer: Option[
ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]
] = None,
configureCommitter: Option[
CommitterSettings => CommitterSettings
] = None,
killSwitch: SharedKillSwitch
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
extends KafkaClient(configureConsumer, configureCommitter) {
override def getSource
: SourceWithContext[ReducedConsumerRecord, ConsumerMessage.CommittableOffset, Consumer.Control] =
super.getSource.via(killSwitch.flow)
}
Loading

0 comments on commit 6210752

Please sign in to comment.