Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling for suspend/resume case for PeriodFromFirst #88

Merged
merged 1 commit into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,21 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google

override type BackupResult = Option[StorageObject]

override type CurrentState = Nothing
override type State = Nothing

override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None)
override def getCurrentUploadState(key: String): Future[UploadStateResult] =
Future.successful(UploadStateResult.empty)

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[Option[StorageObject]]] = {
val base = GCStorage
.resumableUpload(gcsConfig.dataBucket, previousState.previousKey, ContentTypes.`application/json`)
.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeGoogleSettings
.fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings)))
}

override def backupToStorageSink(key: String,
currentState: Option[Nothing]
Expand All @@ -46,5 +58,4 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google
byteString
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.s3.FailedUploadPart
import akka.stream.alpakka.s3.ListMultipartUploadResultUploads
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.alpakka.s3.Part
import akka.stream.alpakka.s3.S3Attributes
Expand Down Expand Up @@ -40,10 +41,55 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings

override type BackupResult = Option[MultipartUploadResult]

override type CurrentState = CurrentS3State
override type State = CurrentS3State

private def extractStateFromUpload(keys: Seq[ListMultipartUploadResultUploads], current: Boolean)(implicit
executionContext: ExecutionContext
): Future[Some[(CurrentS3State, String)]] = {
val listMultipartUploads = keys match {
case Seq(single) =>
if (current)
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
else
logger.info(
s"Found current uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
if (current)
logger.warn(
s"Found multiple currently cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
else
logger.warn(
s"Found multiple previously cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val key = listMultipartUploads.key
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = {
implicit val ec: ExecutionContext = system.classicSystem.getDispatcher
for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some((CurrentS3State(uploadId, finalParts.map(_.toPart)), key))
}

def getCurrentUploadState(key: String): Future[UploadStateResult] = {
implicit val ec: ExecutionContext = system.dispatcher

val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None)

Expand All @@ -52,41 +98,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
maybeS3Settings
.fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)
keys = incompleteUploads.filter(_.key == key)
result <- if (keys.isEmpty)
Future.successful(None)
else {
val listMultipartUploads = keys match {
case Seq(single) =>
logger.info(
s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}"
)
single
case rest =>
val last = rest.maxBy(_.initiated)(Ordering[Instant])
logger.warn(
s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}"
)
last
}
val uploadId = listMultipartUploads.uploadId
val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId)

for {
parts <- maybeS3Settings
.fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings)))
.runWith(Sink.seq)

finalParts = parts.lastOption match {
case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize =>
parts
case _ =>
// We drop the last part here since its broken
parts.dropRight(1)
}
} yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart)))
}
} yield result
(currentKeys, previousKeys) = incompleteUploads.partition(_.key == key)
current <- if (currentKeys.nonEmpty)
extractStateFromUpload(currentKeys, current = true)
else
Future.successful(None)
previous <- if (previousKeys.nonEmpty)
extractStateFromUpload(previousKeys, current = false)
else
Future.successful(None)

} yield UploadStateResult(current.map(_._1),
previous.map { case (state, previousKey) => PreviousState(state, previousKey) }
)

}

Expand Down Expand Up @@ -126,6 +150,28 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings
Sink.combine(success, failure)(Broadcast(_))
}

override def backupToStorageTerminateSink(
previousState: PreviousState
): Sink[ByteString, Future[BackupResult]] = {
logger.info(
s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId:${previousState.state.uploadId}"
)
val sink = S3
.resumeMultipartUploadWithHeaders(
s3Config.dataBucket,
previousState.previousKey,
previousState.state.uploadId,
previousState.state.parts,
s3Headers = s3Headers,
chunkingParallelism = 1
)

val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic))

maybeS3Settings
.fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings)))
}

override def backupToStorageSink(key: String,
currentState: Option[CurrentS3State]
): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ class RealS3BackupClientSpec
}
)

def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download(
dataBucket,
{
case Seq(first, second) => (first.key, second.key)
case rest =>
throw DownloadNotReady(rest)
}
)

def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_],
step: FiniteDuration = 100 millis,
delay: FiniteDuration = 5 seconds
Expand All @@ -158,7 +167,7 @@ class RealS3BackupClientSpec
else
akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay))

property("entire flow works properly from start to end") {
property("basic flow without interruptions using PeriodFromFirst works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
Expand Down Expand Up @@ -242,7 +251,142 @@ class RealS3BackupClientSpec
}
}

property("suspend/resume works correctly") {
property("suspend/resume using PeriodFromFirst creates separate object after resume point") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
logger.info(s"Data bucket is ${s3Config.dataBucket}")

val data = kafkaDataInChunksWithTimePeriod.data.flatten

val topics = data.map(_.topic).toSet

implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics)

implicit val config: S3Config = s3Config
implicit val backupConfig: Backup = Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute))

val producerSettings = createProducer()

val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
implicitly
)

val asProducerRecords = toProducerRecords(data)
val baseSource = toSource(asProducerRecords, 30 seconds)

val adminClient = AdminClient.create(
Map[String, AnyRef](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers
).asJava
)

val createTopics = adminClient.createTopics(topics.map { topic =>
new NewTopic(topic, 1, 1.toShort)
}.asJava)

val calculatedFuture = for {
_ <- createTopics.all().toCompletableFuture.asScala
_ <- createBucket(s3Config.dataBucket)
_ = backupClient.backup.run()
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
new KafkaClient(configureConsumer = baseKafkaConfig),
implicitly,
implicitly,
implicitly,
implicitly
)
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
(firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket)
firstDownloaded <- S3.download(s3Config.dataBucket, firstKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}
secondDownloaded <- S3.download(s3Config.dataBucket, secondKey)
.withAttributes(s3Attrs)
.runWith(Sink.head)
.flatMap {
case Some((downloadSource, _)) =>
downloadSource
.via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]])
.runWith(Sink.seq)
case None =>
throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key")
}

} yield {
val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}

val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) =>
reducedConsumerRecord
}
(first, second)
}

val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue

// Only care about ordering when it comes to key
val firstDownloadedGroupedAsKey = firstDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val secondDownloadedGroupedAsKey = secondDownloaded
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val inputAsKey = data
.groupBy(_.key)
.view
.mapValues { reducedConsumerRecords =>
reducedConsumerRecords.map(_.value)
}
.toMap

val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key =>
(key,
firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key,
List.empty
)
)
}.toMap

downloaded mustMatchTo inputAsKey

}
}

property("suspend/resume for same object using ChronoUnitSlice works correctly") {
forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson),
s3ConfigGen(useVirtualDotHost, bucketPrefix)
) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) =>
Expand All @@ -260,10 +404,10 @@ class RealS3BackupClientSpec

val producerSettings = createProducer()

val firstKillSwitch = KillSwitches.shared("first-kill-switch")
val killSwitch = KillSwitches.shared("kill-switch")

val backupClient =
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch),
new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch),
implicitly,
implicitly,
implicitly,
Expand All @@ -290,7 +434,7 @@ class RealS3BackupClientSpec
_ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES)
_ = baseSource.runWith(Producer.plainSink(producerSettings))
_ <- waitUntilBackupClientHasCommitted(backupClient)
_ = firstKillSwitch.abort(TerminationException)
_ = killSwitch.abort(TerminationException)
secondBackupClient <- akka.pattern.after(2 seconds) {
Future {
new BackupClient(Some(s3Settings))(
Expand All @@ -303,7 +447,7 @@ class RealS3BackupClientSpec
}
}
_ = secondBackupClient.backup.run()
_ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head)
_ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head)
key <- getKeyFromSingleDownload(s3Config.dataBucket)
downloaded <- S3.download(s3Config.dataBucket, key)
.withAttributes(s3Attrs)
Expand Down
Loading