Skip to content

Commit

Permalink
Fix handling for suspend/resume case for PeriodFromFirst
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Jan 24, 2022
1 parent 241eb33 commit e89782d
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,21 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google

override type BackupResult = Option[StorageObject]

override type CurrentState = Nothing
override type State = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
override def getCurrentUploadState(key: String): Future[UploadStateResult] =
Future.successful(UploadStateResult.empty)

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[Option[StorageObject]]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, previousState.previousKey, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
}

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
Expand All @@ -46,5 +58,4 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google
byteString
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.ListMultipartUploadResultUploads
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
Expand Down Expand Up @@ -40,10 +41,55 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings

override type BackupResult = Option[MultipartUploadResult]

override type CurrentState = CurrentS3State
override type State = CurrentS3State

private def extractStateFromUpload(keys: Seq[ListMultipartUploadResultUploads], current: Boolean)(implicit
executionContext: ExecutionContext
): Future[Some[(CurrentS3State, String)]] = {
val listMultipartUploads = keys match {
case Seq(single) =>
if (current)
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
else
logger.info(
s"Found current uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
if (current)
logger.warn(
s"Found multiple currently cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
else
logger.warn(
s"Found multiple previously cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val key = listMultipartUploads.key
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some((CurrentS3State(uploadId, finalParts.map(_.toPart)), key))
}

def getCurrentUploadState(key: String): Future[UploadStateResult] = {
implicit val ec: ExecutionContext = system.dispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

Expand All @@ -52,41 +98,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result
(currentKeys, previousKeys) = incompleteUploads.partition(_.key == key)
current <- if (currentKeys.nonEmpty)
extractStateFromUpload(currentKeys, current = true)
else
Future.successful(None)
previous <- if (previousKeys.nonEmpty)
extractStateFromUpload(previousKeys, current = false)
else
Future.successful(None)

} yield UploadStateResult(current.map(_._1),
previous.map { case (state, previousKey) => PreviousState(state, previousKey) }
)

}

Expand Down Expand Up @@ -126,6 +150,28 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[BackupResult]] = {
logger.info(
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId:${previousState.state.uploadId}"
)
val sink = S3
.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousState.previousKey,
previousState.state.uploadId,
previousState.state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeS3Settings
.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ class RealS3BackupClientSpec
}
)

def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download(
dataBucket,
{
case Seq(first, second) => (first.key, second.key)
case rest =>
throw DownloadNotReady(rest)
}
)

def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_],
step: FiniteDuration = 100 millis,
delay: FiniteDuration = 5 seconds
Expand All @@ -157,7 +166,7 @@ class RealS3BackupClientSpec
else
akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay))

property("entire flow works properly from start to end") {
property("basic flow without interruptions using PeriodFromFirst works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
Expand Down Expand Up @@ -241,7 +250,142 @@ class RealS3BackupClientSpec
}
}

property("suspend/resume works correctly") {
property("suspend/resume using PeriodFromFirst creates separate object after resume point") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(PeriodFromFirst(1 minute))

val producerSettings = createProducer()

val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
implicitly
)

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
(firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket)
firstDownloaded <- S3.download(s3Config.dataBucket, firstKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}
secondDownloaded <- S3.download(s3Config.dataBucket, secondKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield {
val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
(first, second)
}

val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue

// Only care about ordering when it comes to key
val firstDownloadedGroupedAsKey = firstDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val secondDownloadedGroupedAsKey = secondDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key =>
(key,
firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key,
List.empty
)
)
}.toMap

downloaded mustMatchTo inputAsKey

}
}

property("suspend/resume for same object using ChronoUnitSlice works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
Expand All @@ -258,10 +402,10 @@ class RealS3BackupClientSpec

val producerSettings = createProducer()

val firstKillSwitch = KillSwitches.shared("first-kill-switch")
val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch),
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
Expand All @@ -288,7 +432,7 @@ class RealS3BackupClientSpec
_ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES)
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = firstKillSwitch.abort(TerminationException)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
Expand All @@ -301,7 +445,7 @@ class RealS3BackupClientSpec
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head)
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <- S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
Expand Down
Loading

0 comments on commit e89782d

Please sign in to comment.