diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index c65133dd..286d22a0 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -1,5 +1,6 @@ package io.aiven.guardian.kafka.backup +import akka.NotUsed import akka.stream.scaladsl._ import akka.util.ByteString import io.aiven.guardian.kafka.Errors @@ -17,25 +18,6 @@ import java.time._ import java.time.format.DateTimeFormatter import java.time.temporal._ -/** A marker used to indicate in which position the current backup stream is - */ -sealed abstract class BackupStreamPosition - -object BackupStreamPosition { - - /** The backup stream has just started right now - */ - case object Start extends BackupStreamPosition - - /** The backup stream is in the middle of a time period - */ - case object Middle extends BackupStreamPosition - - /** The backup stream position has just hit a boundary for when a new period starts - */ - case object Boundary extends BackupStreamPosition -} - /** An interface for a template on how to backup a Kafka Stream into some data storage * @tparam T * The underlying `kafkaClientInterface` type @@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] { implicit val kafkaClientInterface: T implicit val backupConfig: Backup + /** An element from the original record + */ + private[backup] sealed trait RecordElement + private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord, + context: kafkaClientInterface.CursorContext + ) extends RecordElement + private[backup] case object End extends RecordElement + + /** An element after the record has been transformed to a ByteString + */ + private[backup] sealed trait ByteStringElement { + val data: ByteString + val context: kafkaClientInterface.CursorContext + } + + private[backup] case class Start(override val data: ByteString, + override val context: kafkaClientInterface.CursorContext, + key: String + ) extends ByteStringElement + private[backup] case class Tail(override val data: ByteString, + override val context: kafkaClientInterface.CursorContext + ) extends ByteStringElement + /** Override this type to define the result of backing up data to a datasource */ type BackupResult @@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] { */ def empty: () => Future[BackupResult] - @nowarn("msg=not.*?exhaustive") private[backup] def calculateBackupStreamPositions( sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long), kafkaClientInterface.CursorContext, kafkaClientInterface.Control ] - ): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition), - kafkaClientInterface.CursorContext, - kafkaClientInterface.Control - ] = sourceWithPeriods - .sliding(2) - .map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) => - val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions) + ): Source[RecordElement, kafkaClientInterface.Control] = + sourceWithPeriods.asSource + .prefixAndTail(2) + // This algorithm only works with Source's that have 2 or more elements + .flatMapConcat { + case (Seq( + ((firstReducedConsumerRecord, firstDivision), firstContext), + ((secondReducedConsumerRecord, secondDivision), secondContext) + ), + rest + ) => + val all = Source + .combine( + Source( + List( + ((firstReducedConsumerRecord, firstDivision), firstContext), + ((secondReducedConsumerRecord, secondDivision), secondContext) + ) + ), + rest + )(Concat(_)) - (beforeReducedConsumerRecord, backupStreamPosition) - } - .mapContext { case Seq(cursorContext, _) => cursorContext } + val withDivisions = + all + .sliding(2) + .map { + case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) => + if (isAtBoundary(beforeDivisions, afterDivisions)) + List( + End, + Element(afterReducedConsumerRecord, afterContext) + ) + else + List(Element(afterReducedConsumerRecord, afterContext)) + case rest => + throw Errors.UnhandledStreamCase(rest) + } + .mapConcat(identity) + + Source.combine( + Source.single(Element(firstReducedConsumerRecord, firstContext)), + withDivisions + )(Concat(_)) + // This case only occurs if a Source has a single element so we just directly return it + case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) => + Source.single(Element(singleReducedConsumerRecord, singleContext)) + case (rest, _) => + throw Errors.UnhandledStreamCase(rest) + } private[backup] def sourceWithPeriods( source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)), @@ -107,12 +149,104 @@ trait BackupClientInterface[T <: KafkaClientInterface] { Source.combine( Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))), - rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) } + rest.map { case (reducedConsumerRecord, context) => + (firstTimestamp, (reducedConsumerRecord, context)) + } )(Concat(_)) case None => throw Errors.ExpectedStartOfSource } } + /** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage + * + * @param sourceElements + * A sequence of [[RecordElement]]'s as a result of `sliding(2)` + * @return + * a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]] + */ + private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = { + val stringWithContext = sourceElements match { + case Seq(Element(reducedConsumerRecord, context)) => + // Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user + // in which case stream needs to be terminated with `null]` in order to be valid + List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context))) + case Seq(Element(firstReducedConsumerRecord, firstContext), + Element(secondReducedConsumerRecord, secondContext) + ) => + List( + (s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)), + (s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext)) + ) + case Seq(Element(reducedConsumerRecord, context), End) => + List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context))) + case Seq(End) => + List(("]", None)) + case rest => throw Errors.UnhandledStreamCase(rest) + } + stringWithContext.map { case (string, context) => (ByteString(string), context) } + } + + private[backup] def dropCommaFromEndOfJsonArray(byteString: ByteString) = + byteString.dropRight(1) + + /** Applies the transformation to the first element of a Stream so that it starts of as a JSON array. + * + * @param element + * Starting [[Element]] + * @param key + * The current key being processed + * @param terminate + * Whether to immediately terminate the JSON array for single element in Stream case + * @return + * A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]] + */ + private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) = + transformReducedConsumerRecords(List(element)).map { + case (byteString, Some(context)) => + if (terminate) + Start(ByteString("[") ++ dropCommaFromEndOfJsonArray(byteString) ++ ByteString("]"), context, key) + else + Start(ByteString("[") ++ byteString, context, key) + case _ => + throw Errors.UnhandledStreamCase(List(element)) + } + + /** Fixes the case where is an odd amount of elements in the stream + * @param head + * of stream as a result of `prefixAndTail` + * @param restSource + * of the stream as a result of `prefixAndTail` + * @return + * A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up. + */ + private[backup] def transformTailingElement( + head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])], + restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed] + ) = { + val restTransformed = restSource + .sliding(2, step = 2) + .map { + case Seq((before, Some(context)), (after, None)) => + List((dropCommaFromEndOfJsonArray(before) ++ after, context)) + case Seq((before, Some(beforeContext)), (after, Some(afterContext))) => + List((before, beforeContext), (after, afterContext)) + case Seq((single, Some(context))) => + List((single, context)) + case rest => + throw Errors.UnhandledStreamCase(rest) + } + + head match { + case Seq((byteString, Some(cursorContext))) => + Source.combine( + Source.single((List((byteString, cursorContext)))), + restTransformed + )(Concat(_)) + case rest => + throw Errors.UnhandledStreamCase(rest) + } + } + /** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a * data source. * @return @@ -127,56 +261,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] { val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord)) - val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) => - backupStreamPosition == BackupStreamPosition.Boundary - } + val split = withBackupStreamPositions + .splitAfter { case sourceElement => + sourceElement match { + case End => true + case _ => false + } + } val substreams = split - .prefixAndTail(1) - .flatMapConcat { case (head, restOfReducedConsumerRecords) => - head.headOption match { - case Some(((firstReducedConsumerRecord, _), firstContext)) => - // We need to retrieve the first element of the stream in order to calculate the key/filename - val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime) - - // Now that we have retrieved the first element of the stream, lets recombine it so we create the - // original stream - val combined = Source.combine( - Source.single( - ( - (firstReducedConsumerRecord, BackupStreamPosition.Start), - firstContext - ) - ), - restOfReducedConsumerRecords - )(Concat(_)) + .prefixAndTail(2) + .flatMapConcat { + case (Seq(only: Element, End), _) => + // This case only occurs when you have a single element in a timeslice. + // We have to terminate immediately to create a JSON array with a single element + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) + Source(transformFirstElement(only, key, terminate = true)) + case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) => + val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime) + val firstSource = transformFirstElement(first, key, terminate = false) + + val rest = Source.combine( + Source.single(second), + restOfReducedConsumerRecords + )(Concat(_)) - // Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream - val transformed = combined.map { case ((record, position), context) => - val transform = transformReducedConsumerRecords(record, position) - (transform, context) - } + val restTransformed = rest + .sliding(2, step = 2) + .map(transformReducedConsumerRecords) + .mapConcat(identity) + .prefixAndTail(1) + .flatMapConcat((transformTailingElement _).tupled) + .mapConcat(identity) + .map { case (byteString, context) => Tail(byteString, context) } - transformed.map(data => (data, key)) - case None => Source.empty - } + Source.combine( + Source( + firstSource + ), + restTransformed + )(Concat(_)) + case (Seq(only: Element), _) => + // This case can also occur when user terminates the stream + val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime) + Source(transformFirstElement(only, key, terminate = false)) + case (rest, _) => + throw Errors.UnhandledStreamCase(rest) } // Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071 @nowarn("msg=method lazyInit in object Sink is deprecated") val subFlowSink = substreams - .alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { - case ((_, context), _) => context + .alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement => + byteStringElement.context }) .to( // See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660 Sink.lazyInit( - { case (_, key) => - Future.successful( - backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] { - case ((byteString, _), _) => byteString - } - ) + { + case start: Start => + Future.successful( + backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement => + byteStringElement.data + } + ) + case _ => throw Errors.ExpectedStartOfSource }, empty ) @@ -202,44 +351,22 @@ object BackupClientInterface { def calculateKey(offsetDateTime: OffsetDateTime): String = s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json" - /** Calculates the current position in 2 element sliding of a Stream. + /** Calculates whether we have rolled over a time period given number of divided periods. * @param dividedPeriodsBefore * The number of divided periods in the first element of the slide. -1 is used as a sentinel value to indicate the * start of the stream * @param dividedPeriodsAfter * The number of divided periods in the second element of the slide * @return - * The position of the Stream + * `true` if we have hit a time boundary otherwise `false` */ - def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition = + def isAtBoundary(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean = (dividedPeriodsBefore, dividedPeriodsAfter) match { case (before, after) if after > before => - BackupStreamPosition.Boundary + true case _ => - BackupStreamPosition.Middle - } - - /** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage - * @param reducedConsumerRecord - * The ReducedConsumerRecord to persist - * @param backupStreamPosition - * The position of the record relative in the stream (so it knows if its at the start, middle or end) - * @return - * a `ByteString` ready to be persisted - */ - def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord, - backupStreamPosition: BackupStreamPosition - ): ByteString = { - val string = backupStreamPosition match { - case BackupStreamPosition.Start => - s"[${reducedConsumerRecordAsString(reducedConsumerRecord)}," - case BackupStreamPosition.Middle => - s"${reducedConsumerRecordAsString(reducedConsumerRecord)}," - case BackupStreamPosition.Boundary => - s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]" + false } - ByteString(string) - } protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime, period: FiniteDuration, diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala index d5ccc5a3..9f8c6eb4 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala @@ -4,14 +4,13 @@ import akka.actor.ActorSystem import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import com.softwaremill.diffx.generic.auto._ -import com.softwaremill.diffx.scalatest.DiffMatcher._ import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.kafka.common.record.TimestampType import org.mdedetrich.akka.stream.support.CirceStreamSupport import org.scalatest.Inspectors import org.scalatest.concurrent.ScalaFutures @@ -40,19 +39,15 @@ class BackupClientInterfaceSpec property("Ordered Kafka events should produce at least one BackupStreamPosition.Boundary") { forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice - ) + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) val calculatedFuture = mock.materializeBackupStreamPositions() - val result = calculatedFuture.futureValue - val backupStreamPositions = result.map { case (_, backupStreamPosition) => - backupStreamPosition - } - - Inspectors.forAtLeast(1, backupStreamPositions)( - _ must matchTo(BackupStreamPosition.Boundary: BackupStreamPosition) + Inspectors.forAtLeast(1, calculatedFuture.futureValue)( + _ must equal(mock.End: mock.RecordElement) ) } } @@ -61,16 +56,17 @@ class BackupClientInterfaceSpec "Every ReducedConsumerRecord after a BackupStreamPosition.Boundary should be in the next consecutive time period" ) { forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice - ) + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) val result = mock.materializeBackupStreamPositions().futureValue.toList val allBoundariesWithoutMiddles = result .sliding(2) - .collect { case Seq((_, _: BackupStreamPosition.Boundary.type), (afterRecord, _)) => - afterRecord + .collect { case Seq(mock.End, afterRecordRecordElement: mock.Element) => + afterRecordRecordElement } .toList @@ -83,8 +79,8 @@ class BackupClientInterfaceSpec Inspectors.forEvery(withBeforeAndAfter) { case (before, after) => val periodAsMillis = kafkaDataWithTimePeriod.periodSlice.toMillis - ((before.timestamp - initialTime) / periodAsMillis) mustNot equal( - (after.timestamp - initialTime) / periodAsMillis + ((before.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis) mustNot equal( + (after.reducedConsumerRecord.timestamp - initialTime) / periodAsMillis ) } } @@ -95,58 +91,209 @@ class BackupClientInterfaceSpec "The time difference between two consecutive BackupStreamPosition.Middle's has to be less then the specified time period" ) { forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice - ) + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) val result = mock.materializeBackupStreamPositions().futureValue.toList val allCoupledMiddles = result .sliding(2) - .collect { - case Seq((beforeRecord, _: BackupStreamPosition.Middle.type), - (afterRecord, _: BackupStreamPosition.Middle.type) - ) => - (beforeRecord, afterRecord) + .collect { case Seq(before: mock.Element, after: mock.Element) => + (before, after) } .toList Inspectors.forEvery(allCoupledMiddles) { case (before, after) => - ChronoUnit.MICROS.between(before.toOffsetDateTime, - after.toOffsetDateTime + ChronoUnit.MICROS.between(before.reducedConsumerRecord.toOffsetDateTime, + after.reducedConsumerRecord.toOffsetDateTime ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros } } } + property("the time difference between the first and last timestamp for a given key is less than time period") { + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) + + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData() + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => + (key, + records.flatten.collect { case Some(v) => + v + } + ) + ) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + Inspectors.forEvery(result) { case (_, records) => + (records.headOption, records.lastOption) match { + case (Some(first), Some(last)) if first != last => + ChronoUnit.MICROS.between(first.toOffsetDateTime, + last.toOffsetDateTime + ) must be < kafkaDataWithTimePeriod.periodSlice.toMicros + case _ => + } + } + } + } + property("backup method completes flow correctly for all valid Kafka events") { - forAll(kafkaDataWithTimePeriodsGen()) { (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => - val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), - kafkaDataWithTimePeriod.periodSlice - ) + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 2)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) - val calculatedFuture = for { - _ <- mock.backup.run() - _ <- akka.pattern.after(100 millis)(Future.successful(())) - processedRecords = mock.mergeBackedUpData - asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => - Source - .single(byteString) - .via(CirceStreamSupport.decode[List[ReducedConsumerRecord]]) - .toMat(Sink.collection)(Keep.right) - .run() - .map(records => (key, records.flatten)) - }) - } yield asRecords - - val result = calculatedFuture.futureValue - - val observed = result.flatMap { case (_, values) => values } - - kafkaDataWithTimePeriod.data.containsSlice(observed) mustEqual true - if (observed.nonEmpty) { - observed.head must matchTo(kafkaDataWithTimePeriod.data.head) - } + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData() + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => + (key, + records.flatten.collect { case Some(v) => + v + } + ) + ) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + kafkaDataWithTimePeriod.data mustEqual observed + } + } + + property("backup method completes flow correctly for single element") { + val reducedConsumerRecord = ReducedConsumerRecord("", 1, "key", "value", 1, TimestampType.CREATE_TIME) + + val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source.single( + reducedConsumerRecord + ), + 1 day + ) + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData() + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => + (key, + records.flatten.collect { case Some(v) => + v + } + ) + ) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + List(reducedConsumerRecord) mustEqual observed + } + + property("backup method completes flow correctly for two elements") { + val reducedConsumerRecords = List( + ReducedConsumerRecord("", 1, "key", "value1", 1, TimestampType.CREATE_TIME), + ReducedConsumerRecord("", 2, "key", "value2", 2, TimestampType.CREATE_TIME) + ) + + val mock = new MockedBackupClientInterfaceWithMockedKafkaData(Source( + reducedConsumerRecords + ), + 1 millis + ) + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData() + asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => + Source + .single(byteString) + .via(CirceStreamSupport.decode[List[Option[ReducedConsumerRecord]]]) + .toMat(Sink.collection)(Keep.right) + .run() + .map(records => + (key, + records.flatten.collect { case Some(v) => + v + } + ) + ) + }) + } yield asRecords + + val result = calculatedFuture.futureValue + + val observed = result.flatMap { case (_, values) => values } + + reducedConsumerRecords mustEqual observed + } + + property("backup method correctly terminates every key apart from last") { + forAll(kafkaDataWithTimePeriodsGen().filter(_.data.size > 1)) { + (kafkaDataWithTimePeriod: KafkaDataWithTimePeriod) => + val mock = + new MockedBackupClientInterfaceWithMockedKafkaData(Source(kafkaDataWithTimePeriod.data), + kafkaDataWithTimePeriod.periodSlice + ) + + mock.clear() + val calculatedFuture = for { + _ <- mock.backup.run() + _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + processedRecords = mock.mergeBackedUpData(terminate = false) + } yield processedRecords.splitAt(processedRecords.length - 1) + + val (terminated, nonTerminated) = calculatedFuture.futureValue + + if (nonTerminated.nonEmpty) { + Inspectors.forEvery(terminated) { case (_, byteString) => + byteString.utf8String.takeRight(2) mustEqual "}]" + } + } + + Inspectors.forEvery(nonTerminated) { case (_, byteString) => + byteString.utf8String.takeRight(2) mustEqual "}," + } } } } diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index ff88382f..0e9ed4e3 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.ByteString import io.aiven.guardian.kafka.MockedKafkaClientInterface +import io.aiven.guardian.kafka.Utils._ import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.models.ReducedConsumerRecord @@ -16,6 +17,7 @@ import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ +import java.time.OffsetDateTime import java.util.concurrent.ConcurrentLinkedQueue /** A mocked `BackupClientInterface` which given a `kafkaClientInterface` allows you to @@ -30,21 +32,44 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka * `ByteString`. Use `mergeBackedUpData` to process `backedUpData` into a more convenient data structure once you * have finished writing to it */ - val backedUpData: Iterable[(String, ByteString)] = new ConcurrentLinkedQueue[(String, ByteString)]().asScala + val backedUpData: ConcurrentLinkedQueue[(String, ByteString)] = new ConcurrentLinkedQueue[(String, ByteString)]() /** This method is intended to be called after you have written to it during a test. + * @param terminate + * Whether to terminate the ByteString with `null]` so its valid parsable JSON + * @param sort + * Whether to sort the outputting collection. There are sometimes corner cases when dealing with small sets of + * static data that the outputted stream can be unordered. * @return * `backupData` with all of the `ByteString` data merged for each unique key */ - def mergeBackedUpData: List[(String, ByteString)] = backedUpData - .groupBy { case (key, _) => - key - } - .view - .mapValues { data => - data.toList.map { case (_, byteString) => byteString }.foldLeft(ByteString())(_ ++ _) - } - .toList + def mergeBackedUpData(terminate: Boolean = true, sort: Boolean = true): List[(String, ByteString)] = { + val base = backedUpData.asScala + .orderedGroupBy { case (key, _) => + key + } + .view + .mapValues { data => + val complete = data.map { case (_, byteString) => byteString }.foldLeft(ByteString())(_ ++ _) + if (terminate) + if (complete.utf8String.endsWith("},")) + complete ++ ByteString("null]") + else + complete + else + complete + } + .toList + if (sort) + base.sortBy { case (key, _) => + val withoutExtension = key.substring(0, key.lastIndexOf('.')) + OffsetDateTime.parse(withoutExtension).getNano + } + else + base + } + + def clear(): Unit = backedUpData.clear() override implicit lazy val backupConfig: Backup = Backup( periodSlice @@ -64,16 +89,13 @@ class MockedBackupClientInterface(override val kafkaClientInterface: MockedKafka * A Sink that also provides a `BackupResult` */ override def backupToStorageSink(key: String): Sink[ByteString, Future[Done]] = Sink.foreach { byteString => - backedUpData ++ Iterable((key, byteString)) + backedUpData.add((key, byteString)) } def materializeBackupStreamPositions()(implicit system: ActorSystem - ): Future[immutable.Iterable[(ReducedConsumerRecord, BackupStreamPosition)]] = - calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord)).asSource - .map { case (data, _) => - data - } + ): Future[immutable.Iterable[RecordElement]] = + calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord)) .toMat(Sink.collection)(Keep.right) .run() } diff --git a/core/src/main/scala/io/aiven/guardian/kafka/Errors.scala b/core/src/main/scala/io/aiven/guardian/kafka/Errors.scala index 7e3d3b35..41d1d331 100644 --- a/core/src/main/scala/io/aiven/guardian/kafka/Errors.scala +++ b/core/src/main/scala/io/aiven/guardian/kafka/Errors.scala @@ -6,4 +6,8 @@ object Errors { case object ExpectedStartOfSource extends Errors { override def getMessage: String = "Always expect a single element at the start of a stream" } + + final case class UnhandledStreamCase[T](elems: Seq[T]) extends Errors { + override def getMessage: String = s"Unhandled case for stream ${elems.map(_.toString).mkString(",")}" + } } diff --git a/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala b/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala index 794ccdc4..abbf527c 100644 --- a/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala +++ b/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala @@ -6,9 +6,17 @@ import akka.testkit.TestKitBase import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite +import scala.concurrent.duration._ +import scala.language.postfixOps + trait AkkaStreamTestKit extends TestKitBase with BeforeAndAfterAll { this: Suite => implicit val system: ActorSystem override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system) + + /** If its not possible to determine whether a Stream has finished in a test and instead you need to use a manual + * wait, make sure you wait at least this period of time for akka-streams to initialize properly. + */ + val AkkaStreamInitializationConstant: FiniteDuration = 500 millis } diff --git a/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala b/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala index 925d745d..3371c5ca 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/Utils.scala @@ -2,6 +2,10 @@ package io.aiven.guardian.kafka import org.apache.kafka.common.KafkaFuture +import scala.collection.immutable +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + import java.util.concurrent.CompletableFuture object Utils { @@ -21,4 +25,21 @@ object Utils { } } + /** The standard Scala groupBy returns an `immutable.Map` which is unordered, this version returns an ordered + * `ListMap` for when preserving insertion order is important + */ + implicit class GroupBy[A](val t: IterableOnce[A]) { + def orderedGroupBy[K](f: A => K): immutable.ListMap[K, List[A]] = { + var m = immutable.ListMap.empty[K, ListBuffer[A]] + for (elem <- t.iterator) { + val key = f(elem) + m = m.updatedWith(key) { + case Some(value) => Some(value.addOne(elem)) + case None => Some(mutable.ListBuffer[A](elem)) + } + } + m.map { case (k, v) => (k, v.toList) } + } + } + }