Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix handling for suspend/resume case for PeriodFromFirst
Browse files Browse the repository at this point in the history
mdedetrich committed Jan 21, 2022
1 parent 241eb33 commit b92c57b
Showing 5 changed files with 293 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -29,7 +29,10 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google

override type BackupResult = Option[StorageObject]

override type CurrentState = Nothing
override type State = Nothing

override def terminateAndCompletePreviousBackup(state: Nothing, previousKey: String): Future[Option[StorageObject]] =
Future.successful(None)

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)

Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.ListMultipartUploadResultUploads
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
@@ -40,9 +41,80 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings

override type BackupResult = Option[MultipartUploadResult]

override type CurrentState = CurrentS3State
override type State = CurrentS3State

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
override def terminateAndCompletePreviousBackup(state: CurrentS3State, previousKey: String): Future[BackupResult] = {
logger.info(s"Terminating and completing previous backup with key: $previousKey and uploadId:${state.uploadId}")
val base = S3.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousKey,
state.uploadId,
state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)

val sink = maybeS3Settings.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))

terminateSource.runWith(
sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))
)
}

private def extractStateFromUpload(keys: Seq[ListMultipartUploadResultUploads], current: Boolean)(implicit
executionContext: ExecutionContext
): Future[Some[(CurrentS3State, WhichState)]] = {
val listMultipartUploads = keys match {
case Seq(single) =>
if (current)
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
else
logger.info(
s"Found current uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
if (current)
logger.warn(
s"Found multiple currently cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
else
logger.warn(
s"Found multiple previously cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val key = listMultipartUploads.key
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield {
val whichState =
if (current)
Current
else
Previous(key)

Some((CurrentS3State(uploadId, finalParts.map(_.toPart)), whichState))
}
}

def getCurrentUploadState(key: String): Future[Option[(CurrentS3State, WhichState)]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)
@@ -52,40 +124,14 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
(currentKeys, previousKeys) = incompleteUploads.partition(_.key == key)
result <- if (currentKeys.isEmpty) {
if (previousKeys.isEmpty)
Future.successful(None)
else
extractStateFromUpload(previousKeys, current = false)
} else
extractStateFromUpload(currentKeys, current = true)
} yield result

}
Original file line number Diff line number Diff line change
@@ -148,6 +148,15 @@ class RealS3BackupClientSpec
}
)

def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download(
dataBucket,
{
case Seq(first, second) => (first.key, second.key)
case rest =>
throw DownloadNotReady(rest)
}
)

def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_],
step: FiniteDuration = 100 millis,
delay: FiniteDuration = 5 seconds
@@ -157,7 +166,7 @@ class RealS3BackupClientSpec
else
akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay))

property("entire flow works properly from start to end") {
property("basic flow without interruptions using PeriodFromFirst works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
@@ -241,7 +250,142 @@ class RealS3BackupClientSpec
}
}

property("suspend/resume works correctly") {
property("suspend/resume using PeriodFromFirst creates separate object after resume point") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(PeriodFromFirst(1 minute))

val producerSettings = createProducer()

val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
implicitly
)

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
(firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket)
firstDownloaded <- S3.download(s3Config.dataBucket, firstKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}
secondDownloaded <- S3.download(s3Config.dataBucket, secondKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield {
val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
(first, second)
}

val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue

// Only care about ordering when it comes to key
val firstDownloadedGroupedAsKey = firstDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val secondDownloadedGroupedAsKey = secondDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key =>
(key,
firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key,
List.empty
)
)
}.toMap

downloaded mustMatchTo inputAsKey

}
}

property("suspend/resume for same object using ChronoUnitSlice works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
@@ -258,10 +402,10 @@ class RealS3BackupClientSpec

val producerSettings = createProducer()

val firstKillSwitch = KillSwitches.shared("first-kill-switch")
val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch),
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
@@ -288,7 +432,7 @@ class RealS3BackupClientSpec
_ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES)
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = firstKillSwitch.abort(TerminationException)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
@@ -301,7 +445,7 @@ class RealS3BackupClientSpec
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head)
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <- S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
Original file line number Diff line number Diff line change
@@ -55,24 +55,31 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
override val context: kafkaClientInterface.CursorContext
) extends ByteStringElement

protected sealed trait WhichState

protected case class Previous(key: String) extends WhichState
protected case object Current extends WhichState

/** Override this type to define the result of backing up data to a datasource
*/
type BackupResult

/** Override this type to define the result of calculating the previous state (if it exists)
*/
type CurrentState
type State

import BackupClientInterface._

def terminateAndCompletePreviousBackup(state: State, previousKey: String): Future[BackupResult]

/** Override this method to define how to retrieve the current state of a backup.
* @param key
* The object key or filename for what is being backed up
* @return
* An optional [[Future]] that contains the state if it found a previously aborted backup. Return [[None]] if if
* its a brand new backup.
*/
def getCurrentUploadState(key: String): Future[Option[CurrentState]]
def getCurrentUploadState(key: String): Future[Option[(State, WhichState)]]

/** Override this method to define how to backup a `ByteString` combined with Kafka
* `kafkaClientInterface.CursorContext` to a `DataSource`
@@ -84,7 +91,7 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
* A Sink that also provides a `BackupResult`
*/
def backupToStorageSink(key: String,
currentState: Option[CurrentState]
currentState: Option[State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]]

/** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been
@@ -271,29 +278,43 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
}
}

protected val terminateSource: Source[ByteString, NotUsed] =
Source.single(ByteString("null]"))

/** Prepares the sink before it gets handed to `backupToStorageSink`
*/
private[backup] def prepareStartOfStream(state: Option[CurrentState],
private[backup] def prepareStartOfStream(maybeState: Option[(State, WhichState)],
start: Start
): Sink[ByteStringElement, Future[BackupResult]] =
if (state.isDefined)
Flow[ByteStringElement]
.flatMapPrefix(1) {
case Seq(byteStringElement: Start) =>
val withoutStartOfJsonArray = byteStringElement.data.drop(1)
Flow[ByteStringElement].prepend(
Source.single(byteStringElement.copy(data = withoutStartOfJsonArray))
)
case _ => throw Errors.ExpectedStartOfSource
}
.toMat(backupToStorageSink(start.key, state).contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
})(Keep.right)
else
backupToStorageSink(start.key, state)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
maybeState match {
case Some((_, whichState)) =>
(backupConfig.timeConfiguration, whichState) match {
case (_: PeriodFromFirst, _: Previous) =>
backupToStorageSink(start.key, None)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
case _ =>
Flow[ByteStringElement]
.flatMapPrefix(1) {
case Seq(byteStringElement: Start) =>
val withoutStartOfJsonArray = byteStringElement.data.drop(1)
Flow[ByteStringElement].prepend(
Source.single(byteStringElement.copy(data = withoutStartOfJsonArray))
)
case _ => throw Errors.ExpectedStartOfSource
}
.toMat(backupToStorageSink(start.key, maybeState.map(_._1)).contramap[ByteStringElement] {
byteStringElement =>
(byteStringElement.data, byteStringElement.context)
})(Keep.right)
}
case None =>
backupToStorageSink(start.key, None)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
* data source.
@@ -360,8 +381,17 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
case start: Start =>
implicit val ec: ExecutionContext = system.getDispatcher
for {
state <- getCurrentUploadState(start.key)
} yield prepareStartOfStream(state, start)
maybeState <- getCurrentUploadState(start.key)
_ <- maybeState match {
case Some((state, whichState)) =>
(backupConfig.timeConfiguration, whichState) match {
case (_: PeriodFromFirst, previous: Previous) =>
terminateAndCompletePreviousBackup(state, previous.key)
case _ => Future.successful(None)
}
case None => Future.successful(None)
}
} yield prepareStartOfStream(maybeState, start)
case _ => throw Errors.ExpectedStartOfSource
},
empty
Original file line number Diff line number Diff line change
@@ -80,7 +80,7 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
*/
override type BackupResult = Done

override type CurrentState = Nothing
override type State = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)

@@ -106,6 +106,9 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka
calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))
.toMat(Sink.collection)(Keep.right)
.run()

override def terminateAndCompletePreviousBackup(state: Nothing, previousKey: String): Future[Done] =
Future.successful(Done)
}

/** A `MockedBackupClientInterface` that also uses a mocked `KafkaClientInterface`

0 comments on commit b92c57b

Please sign in to comment.