From 4514490deb1586cd0db61433ceff3f868c8414dc Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Fri, 21 Jan 2022 12:50:28 +0100 Subject: [PATCH] Fix handling for suspend/resume case for PeriodFromFirst --- .../kafka/backup/gcs/BackupClient.scala | 17 +- .../kafka/backup/s3/BackupClient.scala | 122 +++++++++----- .../backup/s3/RealS3BackupClientSpec.scala | 156 +++++++++++++++++- .../kafka/backup/BackupClientInterface.scala | 117 +++++++++---- .../backup/MockedBackupClientInterface.scala | 17 +- 5 files changed, 344 insertions(+), 85 deletions(-) diff --git a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala index 382cc4d3..e9897504 100644 --- a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala +++ b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala @@ -29,9 +29,21 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google override type BackupResult = Option[StorageObject] - override type CurrentState = Nothing + override type State = Nothing - override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) + override def getCurrentUploadState(key: String): Future[UploadStateResult] = + Future.successful(UploadStateResult.empty) + + override def backupToStorageTerminateSink( + previousState: PreviousState + ): Sink[ByteString, Future[Option[StorageObject]]] = { + val base = GCStorage + .resumableUpload(gcsConfig.dataBucket, previousState.previousKey, ContentTypes.`application/json`) + .mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) + + maybeGoogleSettings + .fold(base)(googleSettings => base.withAttributes(GoogleAttributes.settings(googleSettings))) + } override def backupToStorageSink(key: String, currentState: Option[Nothing] @@ -46,5 +58,4 @@ class BackupClient[T <: KafkaClientInterface](maybeGoogleSettings: Option[Google byteString } } - } diff --git a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala index 7ae5f32b..8709794e 100644 --- a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala +++ b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala @@ -4,6 +4,7 @@ import akka.Done import akka.NotUsed import akka.actor.ActorSystem import akka.stream.alpakka.s3.FailedUploadPart +import akka.stream.alpakka.s3.ListMultipartUploadResultUploads import akka.stream.alpakka.s3.MultipartUploadResult import akka.stream.alpakka.s3.Part import akka.stream.alpakka.s3.S3Attributes @@ -40,10 +41,55 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings override type BackupResult = Option[MultipartUploadResult] - override type CurrentState = CurrentS3State + override type State = CurrentS3State + + private def extractStateFromUpload(keys: Seq[ListMultipartUploadResultUploads], current: Boolean)(implicit + executionContext: ExecutionContext + ): Future[Some[(CurrentS3State, String)]] = { + val listMultipartUploads = keys match { + case Seq(single) => + if (current) + logger.info( + s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}" + ) + else + logger.info( + s"Found current uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}" + ) + single + case rest => + val last = rest.maxBy(_.initiated)(Ordering[Instant]) + if (current) + logger.warn( + s"Found multiple currently cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" + ) + else + logger.warn( + s"Found multiple previously cancelled uploads for key: ${last.key} and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" + ) + last + } + val uploadId = listMultipartUploads.uploadId + val key = listMultipartUploads.key + val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId) - override def getCurrentUploadState(key: String): Future[Option[CurrentS3State]] = { - implicit val ec: ExecutionContext = system.classicSystem.getDispatcher + for { + parts <- maybeS3Settings + .fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings))) + .runWith(Sink.seq) + + finalParts = parts.lastOption match { + case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize => + parts + case _ => + // We drop the last part here since its broken + parts.dropRight(1) + } + } yield Some((CurrentS3State(uploadId, finalParts.map(_.toPart)), key)) + } + + def getCurrentUploadState(key: String): Future[UploadStateResult] = { + implicit val ec: ExecutionContext = system.dispatcher val baseListMultipart = S3.listMultipartUpload(s3Config.dataBucket, None) @@ -52,41 +98,19 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings maybeS3Settings .fold(baseListMultipart)(s3Settings => baseListMultipart.withAttributes(S3Attributes.settings(s3Settings))) .runWith(Sink.seq) - keys = incompleteUploads.filter(_.key == key) - result <- if (keys.isEmpty) - Future.successful(None) - else { - val listMultipartUploads = keys match { - case Seq(single) => - logger.info( - s"Found previous uploadId: ${single.uploadId} and bucket: ${s3Config.dataBucket} with key: ${single.key}" - ) - single - case rest => - val last = rest.maxBy(_.initiated)(Ordering[Instant]) - logger.warn( - s"Found multiple previously cancelled uploads for key: $key and bucket: ${s3Config.dataBucket}, picking uploadId: ${last.uploadId}" - ) - last - } - val uploadId = listMultipartUploads.uploadId - val baseList = S3.listParts(s3Config.dataBucket, key, listMultipartUploads.uploadId) - - for { - parts <- maybeS3Settings - .fold(baseList)(s3Settings => baseList.withAttributes(S3Attributes.settings(s3Settings))) - .runWith(Sink.seq) - - finalParts = parts.lastOption match { - case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize => - parts - case _ => - // We drop the last part here since its broken - parts.dropRight(1) - } - } yield Some(CurrentS3State(uploadId, finalParts.map(_.toPart))) - } - } yield result + (currentKeys, previousKeys) = incompleteUploads.partition(_.key == key) + current <- if (currentKeys.nonEmpty) + extractStateFromUpload(currentKeys, current = true) + else + Future.successful(None) + previous <- if (previousKeys.nonEmpty) + extractStateFromUpload(previousKeys, current = false) + else + Future.successful(None) + + } yield UploadStateResult(current.map(_._1), + previous.map { case (state, previousKey) => PreviousState(state, previousKey) } + ) } @@ -126,6 +150,28 @@ class BackupClient[T <: KafkaClientInterface](maybeS3Settings: Option[S3Settings Sink.combine(success, failure)(Broadcast(_)) } + override def backupToStorageTerminateSink( + previousState: PreviousState + ): Sink[ByteString, Future[BackupResult]] = { + logger.info( + s"Terminating and completing previous backup with key: ${previousState.previousKey} and uploadId:${previousState.state.uploadId}" + ) + val sink = S3 + .resumeMultipartUploadWithHeaders( + s3Config.dataBucket, + previousState.previousKey, + previousState.state.uploadId, + previousState.state.parts, + s3Headers = s3Headers, + chunkingParallelism = 1 + ) + + val base = sink.mapMaterializedValue(future => future.map(result => Some(result))(ExecutionContext.parasitic)) + + maybeS3Settings + .fold(base)(s3Settings => base.withAttributes(S3Attributes.settings(s3Settings))) + } + override def backupToStorageSink(key: String, currentState: Option[CurrentS3State] ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] = { diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala index 3f96851f..a40be6d2 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -149,6 +149,15 @@ class RealS3BackupClientSpec } ) + def getKeysFromTwoDownloads(dataBucket: String): Future[(String, String)] = waitForS3Download( + dataBucket, + { + case Seq(first, second) => (first.key, second.key) + case rest => + throw DownloadNotReady(rest) + } + ) + def waitUntilBackupClientHasCommitted(backupClient: BackupClientChunkState[_], step: FiniteDuration = 100 millis, delay: FiniteDuration = 5 seconds @@ -158,7 +167,7 @@ class RealS3BackupClientSpec else akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) - property("entire flow works properly from start to end") { + property("basic flow without interruptions using PeriodFromFirst works correctly") { forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), s3ConfigGen(useVirtualDotHost, bucketPrefix) ) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) => @@ -242,7 +251,142 @@ class RealS3BackupClientSpec } } - property("suspend/resume works correctly") { + property("suspend/resume using PeriodFromFirst creates separate object after resume point") { + forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), + s3ConfigGen(useVirtualDotHost, bucketPrefix) + ) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) => + logger.info(s"Data bucket is ${s3Config.dataBucket}") + + val data = kafkaDataInChunksWithTimePeriod.data.flatten + + val topics = data.map(_.topic).toSet + + implicit val kafkaClusterConfig: KafkaCluster = KafkaCluster(topics) + + implicit val config: S3Config = s3Config + implicit val backupConfig: Backup = Backup(MockedBackupClientInterface.KafkaGroupId, PeriodFromFirst(1 minute)) + + val producerSettings = createProducer() + + val killSwitch = KillSwitches.shared("kill-switch") + + val backupClient = + new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), + implicitly, + implicitly, + implicitly, + implicitly + ) + + val asProducerRecords = toProducerRecords(data) + val baseSource = toSource(asProducerRecords, 30 seconds) + + val adminClient = AdminClient.create( + Map[String, AnyRef]( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> container.bootstrapServers + ).asJava + ) + + val createTopics = adminClient.createTopics(topics.map { topic => + new NewTopic(topic, 1, 1.toShort) + }.asJava) + + val calculatedFuture = for { + _ <- createTopics.all().toCompletableFuture.asScala + _ <- createBucket(s3Config.dataBucket) + _ = backupClient.backup.run() + _ = baseSource.runWith(Producer.plainSink(producerSettings)) + _ <- waitUntilBackupClientHasCommitted(backupClient) + _ = killSwitch.abort(TerminationException) + secondBackupClient <- akka.pattern.after(2 seconds) { + Future { + new BackupClient(Some(s3Settings))( + new KafkaClient(configureConsumer = baseKafkaConfig), + implicitly, + implicitly, + implicitly, + implicitly + ) + } + } + _ = secondBackupClient.backup.run() + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) + (firstKey, secondKey) <- getKeysFromTwoDownloads(s3Config.dataBucket) + firstDownloaded <- S3.download(s3Config.dataBucket, firstKey) + .withAttributes(s3Attrs) + .runWith(Sink.head) + .flatMap { + case Some((downloadSource, _)) => + downloadSource + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .runWith(Sink.seq) + case None => + throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key") + } + secondDownloaded <- S3.download(s3Config.dataBucket, secondKey) + .withAttributes(s3Attrs) + .runWith(Sink.head) + .flatMap { + case Some((downloadSource, _)) => + downloadSource + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .runWith(Sink.seq) + case None => + throw new Exception(s"Expected object in bucket ${s3Config.dataBucket} with key $key") + } + + } yield { + val first = firstDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + + val second = secondDownloaded.toList.flatten.collect { case Some(reducedConsumerRecord) => + reducedConsumerRecord + } + (first, second) + } + + val (firstDownloaded, secondDownloaded) = calculatedFuture.futureValue + + // Only care about ordering when it comes to key + val firstDownloadedGroupedAsKey = firstDownloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val secondDownloadedGroupedAsKey = secondDownloaded + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val inputAsKey = data + .groupBy(_.key) + .view + .mapValues { reducedConsumerRecords => + reducedConsumerRecords.map(_.value) + } + .toMap + + val downloaded = (firstDownloadedGroupedAsKey.keySet ++ secondDownloadedGroupedAsKey.keySet).map { key => + (key, + firstDownloadedGroupedAsKey.getOrElse(key, List.empty) ++ secondDownloadedGroupedAsKey.getOrElse(key, + List.empty + ) + ) + }.toMap + + downloaded mustMatchTo inputAsKey + + } + } + + property("suspend/resume for same object using ChronoUnitSlice works correctly") { forAll(kafkaDataWithMinSizeGen(S3.MinChunkSize, 2, reducedConsumerRecordsToJson), s3ConfigGen(useVirtualDotHost, bucketPrefix) ) { (kafkaDataInChunksWithTimePeriod: KafkaDataInChunksWithTimePeriod, s3Config: S3Config) => @@ -260,10 +404,10 @@ class RealS3BackupClientSpec val producerSettings = createProducer() - val firstKillSwitch = KillSwitches.shared("first-kill-switch") + val killSwitch = KillSwitches.shared("kill-switch") val backupClient = - new BackupClientChunkState(Some(s3Settings))(createKafkaClient(firstKillSwitch), + new BackupClientChunkState(Some(s3Settings))(createKafkaClient(killSwitch), implicitly, implicitly, implicitly, @@ -290,7 +434,7 @@ class RealS3BackupClientSpec _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) _ = baseSource.runWith(Producer.plainSink(producerSettings)) _ <- waitUntilBackupClientHasCommitted(backupClient) - _ = firstKillSwitch.abort(TerminationException) + _ = killSwitch.abort(TerminationException) secondBackupClient <- akka.pattern.after(2 seconds) { Future { new BackupClient(Some(s3Settings))( @@ -303,7 +447,7 @@ class RealS3BackupClientSpec } } _ = secondBackupClient.backup.run() - _ <- sendTopicAfterTimePeriod(1 minutes, producerSettings, topics.head) + _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) key <- getKeyFromSingleDownload(s3Config.dataBucket) downloaded <- S3.download(s3Config.dataBucket, key) .withAttributes(s3Attrs) diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index 657f55cf..f376cf1d 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -4,6 +4,7 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl._ import akka.util.ByteString +import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.kafka.Errors import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice @@ -26,7 +27,7 @@ import java.time.temporal._ * @tparam T * The underlying `kafkaClientInterface` type */ -trait BackupClientInterface[T <: KafkaClientInterface] { +trait BackupClientInterface[T <: KafkaClientInterface] extends StrictLogging { implicit val kafkaClientInterface: T implicit val backupConfig: Backup implicit val system: ActorSystem @@ -54,36 +55,61 @@ trait BackupClientInterface[T <: KafkaClientInterface] { override val context: kafkaClientInterface.CursorContext ) extends ByteStringElement + case class PreviousState(state: State, previousKey: String) + case class UploadStateResult(current: Option[State], previous: Option[PreviousState]) + object UploadStateResult { + val empty: UploadStateResult = UploadStateResult(None, None) + } + /** Override this type to define the result of backing up data to a datasource */ type BackupResult /** Override this type to define the result of calculating the previous state (if it exists) */ - type CurrentState + type State import BackupClientInterface._ - /** Override this method to define how to retrieve the current state of a backup. + /** Override this method to define how to retrieve the current state of any unfinished backups. * @param key - * The object key or filename for what is being backed up + * The object key or filename for what is currently being backed up + * @return + * A [[Future]] with a [[UploadStateResult]] data structure that optionally contains the state associated with + * `key` along with the previous latest state before `key` (if it exists) + */ + def getCurrentUploadState(key: String): Future[UploadStateResult] + + /** A sink that is executed whenever a previously existing Backup needs to be terminated and closed. Generally + * speaking this [[Sink]] is similar to the [[backupToStorageSink]] except that + * [[kafkaClientInterface.CursorContext]] is not required since no Kafka messages are being written. + * + * Note that the terminate refers to the fact that this Sink is executed with a `null]` [[Source]] which when written + * to an already existing unfinished backup terminates the containing JSON array so that it becomes valid parsable + * JSON. + * @param previousState + * A data structure containing both the [[State]] along with the associated key which you can refer to in order to + * define your [[Sink]] * @return - * An optional [[Future]] that contains the state if it found a previously aborted backup. Return [[None]] if if - * its a brand new backup. + * A [[Sink]] that points to an existing key defined by `previousState.previousKey` */ - def getCurrentUploadState(key: String): Future[Option[CurrentState]] + def backupToStorageTerminateSink(previousState: PreviousState): Sink[ByteString, Future[BackupResult]] /** Override this method to define how to backup a `ByteString` combined with Kafka * `kafkaClientInterface.CursorContext` to a `DataSource` * @param key * The object key or filename for what is being backed up * @param currentState - * The current state if it exists. This is used when resuming from a previously aborted backup + * The current state if it exists. If this is empty then a new backup is being created with the associated `key` + * otherwise if this contains a [[State]] then the defined [[Sink]] needs to handle resuming a previously + * unfinished backup with that `key` by directly appending the [[ByteString]] data. * @return - * A Sink that also provides a `BackupResult` + * A [[Sink]] that given a [[ByteString]] (containing a single Kafka [[ReducedConsumerRecord]]) along with its + * [[kafkaClientInterface.CursorContext]] backs up the data to your data storage. The [[Sink]] is also responsible + * for executing [[kafkaClientInterface.commitCursor]] when the data is successfully backed up */ def backupToStorageSink(key: String, - currentState: Option[CurrentState] + currentState: Option[State] ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[BackupResult]] /** Override this method to define a zero vale that covers the case that occurs immediately when `SubFlow` has been @@ -270,29 +296,57 @@ trait BackupClientInterface[T <: KafkaClientInterface] { } } + private[backup] val terminateSource: Source[ByteString, NotUsed] = + Source.single(ByteString("null]")) + /** Prepares the sink before it gets handed to `backupToStorageSink` */ - private[backup] def prepareStartOfStream(state: Option[CurrentState], + private[backup] def prepareStartOfStream(uploadStateResult: UploadStateResult, start: Start ): Sink[ByteStringElement, Future[BackupResult]] = - if (state.isDefined) - Flow[ByteStringElement] - .flatMapPrefix(1) { - case Seq(byteStringElement: Start) => - val withoutStartOfJsonArray = byteStringElement.data.drop(1) - Flow[ByteStringElement].prepend( - Source.single(byteStringElement.copy(data = withoutStartOfJsonArray)) + (uploadStateResult.previous, uploadStateResult.current) match { + case (Some(previous), None) => + backupConfig.timeConfiguration match { + case _: PeriodFromFirst => + backupToStorageSink(start.key, None) + .contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + } + case _: ChronoUnitSlice => + logger.warn( + s"Detected previous backup using PeriodFromFirst however current configuration is now changed to ChronoUnitSlice. Object/file with an older key: ${start.key} may contain newer events than object/file with newer key: ${previous.previousKey}" ) - case _ => throw Errors.ExpectedStartOfSource + backupToStorageSink(start.key, None) + .contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + } } - .toMat(backupToStorageSink(start.key, state).contramap[ByteStringElement] { byteStringElement => - (byteStringElement.data, byteStringElement.context) - })(Keep.right) - else - backupToStorageSink(start.key, state) - .contramap[ByteStringElement] { byteStringElement => - (byteStringElement.data, byteStringElement.context) + case (None, Some(current)) => + backupConfig.timeConfiguration match { + case _: PeriodFromFirst => + throw Errors.UnhandledStreamCase(List(current)) + case _: ChronoUnitSlice => + Flow[ByteStringElement] + .flatMapPrefix(1) { + case Seq(byteStringElement: Start) => + val withoutStartOfJsonArray = byteStringElement.data.drop(1) + Flow[ByteStringElement].prepend( + Source.single(byteStringElement.copy(data = withoutStartOfJsonArray)) + ) + case _ => throw Errors.ExpectedStartOfSource + } + .toMat(backupToStorageSink(start.key, Some(current)).contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + })(Keep.right) } + case (None, None) => + backupToStorageSink(start.key, None) + .contramap[ByteStringElement] { byteStringElement => + (byteStringElement.data, byteStringElement.context) + } + case (Some(previous), Some(current)) => + throw Errors.UnhandledStreamCase(List(previous.state, current)) + } /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a * data source. @@ -357,10 +411,15 @@ trait BackupClientInterface[T <: KafkaClientInterface] { Sink.lazyInit( { case start: Start => - implicit val ec: ExecutionContext = system.getDispatcher + implicit val ec: ExecutionContext = system.dispatcher for { - state <- getCurrentUploadState(start.key) - } yield prepareStartOfStream(state, start) + uploadStateResult <- getCurrentUploadState(start.key) + _ <- (uploadStateResult.previous, uploadStateResult.current) match { + case (Some(previous), None) => + terminateSource.runWith(backupToStorageTerminateSink(previous)).map(Some.apply) + case _ => Future.successful(None) + } + } yield prepareStartOfStream(uploadStateResult, start) case _ => throw Errors.ExpectedStartOfSource }, empty diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index 681665de..422d0318 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -82,19 +82,18 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka */ override type BackupResult = Done - override type CurrentState = Nothing + override type State = Nothing - override def getCurrentUploadState(key: String): Future[Option[Nothing]] = Future.successful(None) + override def getCurrentUploadState(key: String): Future[UploadStateResult] = + Future.successful(UploadStateResult.empty) override def empty: () => Future[Done] = () => Future.successful(Done) - /** Override this method to define how to backup a `ByteString` to a `DataSource` - * - * @param key - * The object key or filename for what is being backed up - * @return - * A Sink that also provides a `BackupResult` - */ + override def backupToStorageTerminateSink(previousState: PreviousState): Sink[ByteString, Future[Done]] = + Sink.foreach[ByteString] { byteString => + backedUpData.add((previousState.previousKey, byteString)) + } + override def backupToStorageSink(key: String, currentState: Option[Nothing] ): Sink[(ByteString, kafkaClientInterface.CursorContext), Future[Done]] =