Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix handling for suspend/resume case for PeriodFromFirst
Browse files Browse the repository at this point in the history
mdedetrich committed Jan 25, 2022
1 parent bd9b3b4 commit 53f82d3
Showing 5 changed files with 344 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -29,9 +29,21 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google

override type BackupResult = Option[StorageObject]

override type CurrentState = Nothing
override type State = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
override def getCurrentUploadState(key: String): Future[UploadStateResult] =
Future.successful(UploadStateResult.empty)

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[Option[StorageObject]]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, previousState.previousKey, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
}

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
@@ -46,5 +58,4 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google
byteString
}
}

}
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.ListMultipartUploadResultUploads
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
@@ -40,10 +41,55 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings

override type BackupResult = Option[MultipartUploadResult]

override type CurrentState = CurrentS3State
override type State = CurrentS3State

private def extractStateFromUpload(keys: Seq[ListMultipartUploadResultUploads], current: Boolean)(implicit
executionContext: ExecutionContext
): Future[Some[(CurrentS3State, String)]] = {
val listMultipartUploads = keys match {
case Seq(single) =>
if (current)
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
else
logger.info(
s"Found current uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
if (current)
logger.warn(
s"Found multiple currently cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
else
logger.warn(
s"Found multiple previously cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val key = listMultipartUploads.key
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some((CurrentS3State(uploadId, finalParts.map(_.toPart)), key))
}

def getCurrentUploadState(key: String): Future[UploadStateResult] = {
implicit val ec: ExecutionContext = system.dispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

@@ -52,41 +98,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result
(currentKeys, previousKeys) = incompleteUploads.partition(_.key == key)
current <- if (currentKeys.nonEmpty)
extractStateFromUpload(currentKeys, current = true)
else
Future.successful(None)
previous <- if (previousKeys.nonEmpty)
extractStateFromUpload(previousKeys, current = false)
else
Future.successful(None)

} yield UploadStateResult(current.map(_._1),
previous.map { case (state, previousKey) => PreviousState(state, previousKey) }
)

}

@@ -126,6 +150,28 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[BackupResult]] = {
logger.info(
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId:${previousState.state.uploadId}"
)
val sink = S3
.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousState.previousKey,
previousState.state.uploadId,
previousState.state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeS3Settings
.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
Original file line number Diff line number Diff line change
@@ -149,6 +149,15 @@ class RealS3BackupClientSpec
}
)

def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download(
dataBucket,
{
case Seq(first, second) => (first.key, second.key)
case rest =>
throw DownloadNotReady(rest)
}
)

def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_],
step: FiniteDuration = 100 millis,
delay: FiniteDuration = 5 seconds
@@ -158,7 +167,7 @@ class RealS3BackupClientSpec
else
akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay))

property("entire flow works properly from start to end") {
property("basic flow without interruptions using PeriodFromFirst works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
@@ -242,7 +251,142 @@ class RealS3BackupClientSpec
}
}

property("suspend/resume works correctly") {
property("suspend/resume using PeriodFromFirst creates separate object after resume point") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(PeriodFromFirst(1 minute))

val producerSettings = createProducer()

val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
implicitly
)

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
(firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket)
firstDownloaded <- S3.download(s3Config.dataBucket, firstKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}
secondDownloaded <- S3.download(s3Config.dataBucket, secondKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield {
val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
(first, second)
}

val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue

// Only care about ordering when it comes to key
val firstDownloadedGroupedAsKey = firstDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val secondDownloadedGroupedAsKey = secondDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key =>
(key,
firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key,
List.empty
)
)
}.toMap

downloaded mustMatchTo inputAsKey

}
}

property("suspend/resume for same object using ChronoUnitSlice works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
@@ -260,10 +404,10 @@ class RealS3BackupClientSpec

val producerSettings = createProducer()

val firstKillSwitch = KillSwitches.shared("first-kill-switch")
val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch),
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
@@ -290,7 +434,7 @@ class RealS3BackupClientSpec
_ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES)
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = firstKillSwitch.abort(TerminationException)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
@@ -303,7 +447,7 @@ class RealS3BackupClientSpec
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head)
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <- S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import io.aiven.guardian.kafka.Errors
import io.aiven.guardian.kafka.backup.configs.Backup
import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice
@@ -26,7 +27,7 @@ import java.time.temporal._
* @tparam T
* The underlying `kafkaClientInterface` type
*/
trait BackupClientInterface[T <: KafkaClientInterface] {
trait BackupClientInterface[T <: KafkaClientInterface] extends StrictLogging {
implicit val kafkaClientInterface: T
implicit val backupConfig: Backup
implicit val system: ActorSystem
@@ -54,36 +55,61 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
override val context: kafkaClientInterface.CursorContext
) extends ByteStringElement

case class PreviousState(state: State, previousKey: String)
case class UploadStateResult(current: Option[State], previous: Option[PreviousState])
object UploadStateResult {
val empty: UploadStateResult = UploadStateResult(None, None)
}

/** Override this type to define the result of backing up data to a datasource
*/
type BackupResult

/** Override this type to define the result of calculating the previous state (if it exists)
*/
type CurrentState
type State

import BackupClientInterface._

/** Override this method to define how to retrieve the current state of a backup.
/** Override this method to define how to retrieve the current state of any unfinished backups.
* @param key
* The object key or filename for what is being backed up
* The object key or filename for what is currently being backed up
* @return
* A [[Future]] with a [[UploadStateResult]] data structure that optionally contains the state associated with
* `key` along with the previous latest state before `key` (if it exists)
*/
def getCurrentUploadState(key: String): Future[UploadStateResult]

/** A sink that is executed whenever a previously existing Backup needs to be terminated and closed. Generally
* speaking this [[Sink]] is similar to the [[backupToStorageSink]] except that
* [[kafkaClientInterface.CursorContext]] is not required since no Kafka messages are being written.
*
* Note that the terminate refers to the fact that this Sink is executed with a `null]` [[Source]] which when written
* to an already existing unfinished backup terminates the containing JSON array so that it becomes valid parsable
* JSON.
* @param previousState
* A data structure containing both the [[State]] along with the associated key which you can refer to in order to
* define your [[Sink]]
* @return
* An optional [[Future]] that contains the state if it found a previously aborted backup. Return [[None]] if if
* its a brand new backup.
* A [[Sink]] that points to an existing key defined by `previousState.previousKey`
*/
def getCurrentUploadState(key: String): Future[Option[CurrentState]]
def backupToStorageTerminateSink(previousState: PreviousState): Sink[ByteString, Future[BackupResult]]

/** Override this method to define how to backup a `ByteString` combined with Kafka
* `kafkaClientInterface.CursorContext` to a `DataSource`
* @param key
* The object key or filename for what is being backed up
* @param currentState
* The current state if it exists. This is used when resuming from a previously aborted backup
* The current state if it exists. If this is empty then a new backup is being created with the associated `key`
* otherwise if this contains a [[State]] then the defined [[Sink]] needs to handle resuming a previously
* unfinished backup with that `key` by directly appending the [[ByteString]] data.
* @return
* A Sink that also provides a `BackupResult`
* A [[Sink]] that given a [[ByteString]] (containing a single Kafka [[ReducedConsumerRecord]]) along with its
* [[kafkaClientInterface.CursorContext]] backs up the data to your data storage. The [[Sink]] is also responsible
* for executing [[kafkaClientInterface.commitCursor]] when the data is successfully backed up
*/
def backupToStorageSink(key: String,
currentState: Option[CurrentState]
currentState: Option[State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]]

/** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been
@@ -270,29 +296,57 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
}
}

private[backup] val terminateSource: Source[ByteString, NotUsed] =
Source.single(ByteString("null]"))

/** Prepares the sink before it gets handed to `backupToStorageSink`
*/
private[backup] def prepareStartOfStream(state: Option[CurrentState],
private[backup] def prepareStartOfStream(uploadStateResult: UploadStateResult,
start: Start
): Sink[ByteStringElement, Future[BackupResult]] =
if (state.isDefined)
Flow[ByteStringElement]
.flatMapPrefix(1) {
case Seq(byteStringElement: Start) =>
val withoutStartOfJsonArray = byteStringElement.data.drop(1)
Flow[ByteStringElement].prepend(
Source.single(byteStringElement.copy(data = withoutStartOfJsonArray))
(uploadStateResult.previous, uploadStateResult.current) match {
case (Some(previous), None) =>
backupConfig.timeConfiguration match {
case _: PeriodFromFirst =>
backupToStorageSink(start.key, None)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
case _: ChronoUnitSlice =>
logger.warn(
s"Detected previous backup using PeriodFromFirst however current configuration is now changed to ChronoUnitSlice. Object/file with an older key: ${start.key} may contain newer events than object/file with newer key: ${previous.previousKey}"
)
case _ => throw Errors.ExpectedStartOfSource
backupToStorageSink(start.key, None)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
}
.toMat(backupToStorageSink(start.key, state).contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
})(Keep.right)
else
backupToStorageSink(start.key, state)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
case (None, Some(current)) =>
backupConfig.timeConfiguration match {
case _: PeriodFromFirst =>
throw Errors.UnhandledStreamCase(List(current))
case _: ChronoUnitSlice =>
Flow[ByteStringElement]
.flatMapPrefix(1) {
case Seq(byteStringElement: Start) =>
val withoutStartOfJsonArray = byteStringElement.data.drop(1)
Flow[ByteStringElement].prepend(
Source.single(byteStringElement.copy(data = withoutStartOfJsonArray))
)
case _ => throw Errors.ExpectedStartOfSource
}
.toMat(backupToStorageSink(start.key, Some(current)).contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
})(Keep.right)
}
case (None, None) =>
backupToStorageSink(start.key, None)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
case (Some(previous), Some(current)) =>
throw Errors.UnhandledStreamCase(List(previous.state, current))
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
* data source.
@@ -357,10 +411,15 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
Sink.lazyInit(
{
case start: Start =>
implicit val ec: ExecutionContext = system.getDispatcher
implicit val ec: ExecutionContext = system.dispatcher
for {
state <- getCurrentUploadState(start.key)
} yield prepareStartOfStream(state, start)
uploadStateResult <- getCurrentUploadState(start.key)
_ <- (uploadStateResult.previous, uploadStateResult.current) match {
case (Some(previous), None) =>
terminateSource.runWith(backupToStorageTerminateSink(previous)).map(Some.apply)
case _ => Future.successful(None)
}
} yield prepareStartOfStream(uploadStateResult, start)
case _ => throw Errors.ExpectedStartOfSource
},
empty
Original file line number Diff line number Diff line change
@@ -82,19 +82,18 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
*/
override type BackupResult = Done

override type CurrentState = Nothing
override type State = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
override def getCurrentUploadState(key: String): Future[UploadStateResult] =
Future.successful(UploadStateResult.empty)

override def empty: () => Future[Done] = () => Future.successful(Done)

/** Override this method to define how to backup a `ByteString` to a `DataSource`
*
* @param key
* The object key or filename for what is being backed up
* @return
* A Sink that also provides a `BackupResult`
*/
override def backupToStorageTerminateSink(previousState: PreviousState): Sink[ByteString, Future[Done]] =
Sink.foreach[ByteString] { byteString =>
backedUpData.add((previousState.previousKey, byteString))
}

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[Done]] =

0 comments on commit 53f82d3

Please sign in to comment.