Skip to content

Commit

Permalink
Fix fencepost issues with stream algorithm plus fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich authored Jan 4, 2022
1 parent cb5ebe4 commit 4b9593f
Show file tree
Hide file tree
Showing 6 changed files with 495 additions and 166 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.guardian.kafka.backup

import akka.NotUsed
import akka.stream.scaladsl._
import akka.util.ByteString
import io.aiven.guardian.kafka.Errors
Expand All @@ -17,25 +18,6 @@ import java.time._
import java.time.format.DateTimeFormatter
import java.time.temporal._

/** A marker used to indicate in which position the current backup stream is
*/
sealed abstract class BackupStreamPosition

object BackupStreamPosition {

/** The backup stream has just started right now
*/
case object Start extends BackupStreamPosition

/** The backup stream is in the middle of a time period
*/
case object Middle extends BackupStreamPosition

/** The backup stream position has just hit a boundary for when a new period starts
*/
case object Boundary extends BackupStreamPosition
}

/** An interface for a template on how to backup a Kafka Stream into some data storage
* @tparam T
* The underlying `kafkaClientInterface` type
Expand All @@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
implicit val kafkaClientInterface: T
implicit val backupConfig: Backup

/** An element from the original record
*/
private[backup] sealed trait RecordElement
private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord,
context: kafkaClientInterface.CursorContext
) extends RecordElement
private[backup] case object End extends RecordElement

/** An element after the record has been transformed to a ByteString
*/
private[backup] sealed trait ByteStringElement {
val data: ByteString
val context: kafkaClientInterface.CursorContext
}

private[backup] case class Start(override val data: ByteString,
override val context: kafkaClientInterface.CursorContext,
key: String
) extends ByteStringElement
private[backup] case class Tail(override val data: ByteString,
override val context: kafkaClientInterface.CursorContext
) extends ByteStringElement

/** Override this type to define the result of backing up data to a datasource
*/
type BackupResult
Expand All @@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
*/
def empty: () => Future[BackupResult]

@nowarn("msg=not.*?exhaustive")
private[backup] def calculateBackupStreamPositions(
sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long),
kafkaClientInterface.CursorContext,
kafkaClientInterface.Control
]
): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition),
kafkaClientInterface.CursorContext,
kafkaClientInterface.Control
] = sourceWithPeriods
.sliding(2)
.map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)
): Source[RecordElement, kafkaClientInterface.Control] =
sourceWithPeriods.asSource
.prefixAndTail(2)
// This algorithm only works with Source's that have 2 or more elements
.flatMapConcat {
case (Seq(
((firstReducedConsumerRecord, firstDivision), firstContext),
((secondReducedConsumerRecord, secondDivision), secondContext)
),
rest
) =>
val all = Source
.combine(
Source(
List(
((firstReducedConsumerRecord, firstDivision), firstContext),
((secondReducedConsumerRecord, secondDivision), secondContext)
)
),
rest
)(Concat(_))

(beforeReducedConsumerRecord, backupStreamPosition)
}
.mapContext { case Seq(cursorContext, _) => cursorContext }
val withDivisions =
all
.sliding(2)
.map {
case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) =>
if (isAtBoundary(beforeDivisions, afterDivisions))
List(
End,
Element(afterReducedConsumerRecord, afterContext)
)
else
List(Element(afterReducedConsumerRecord, afterContext))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}
.mapConcat(identity)

Source.combine(
Source.single(Element(firstReducedConsumerRecord, firstContext)),
withDivisions
)(Concat(_))
// This case only occurs if a Source has a single element so we just directly return it
case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) =>
Source.single(Element(singleReducedConsumerRecord, singleContext))
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
}

private[backup] def sourceWithPeriods(
source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)),
Expand All @@ -107,12 +149,104 @@ trait BackupClientInterface[T <: KafkaClientInterface] {

Source.combine(
Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))),
rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) }
rest.map { case (reducedConsumerRecord, context) =>
(firstTimestamp, (reducedConsumerRecord, context))
}
)(Concat(_))
case None => throw Errors.ExpectedStartOfSource
}
}

/** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage
*
* @param sourceElements
* A sequence of [[RecordElement]]'s as a result of `sliding(2)`
* @return
* a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]]
*/
private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = {
val stringWithContext = sourceElements match {
case Seq(Element(reducedConsumerRecord, context)) =>
// Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user
// in which case stream needs to be terminated with `null]` in order to be valid
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context)))
case Seq(Element(firstReducedConsumerRecord, firstContext),
Element(secondReducedConsumerRecord, secondContext)
) =>
List(
(s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)),
(s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext))
)
case Seq(Element(reducedConsumerRecord, context), End) =>
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context)))
case Seq(End) =>
List(("]", None))
case rest => throw Errors.UnhandledStreamCase(rest)
}
stringWithContext.map { case (string, context) => (ByteString(string), context) }
}

private[backup] def dropCommaFromEndOfJsonArray(byteString: ByteString) =
byteString.dropRight(1)

/** Applies the transformation to the first element of a Stream so that it starts of as a JSON array.
*
* @param element
* Starting [[Element]]
* @param key
* The current key being processed
* @param terminate
* Whether to immediately terminate the JSON array for single element in Stream case
* @return
* A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]]
*/
private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) =
transformReducedConsumerRecords(List(element)).map {
case (byteString, Some(context)) =>
if (terminate)
Start(ByteString("[") ++ dropCommaFromEndOfJsonArray(byteString) ++ ByteString("]"), context, key)
else
Start(ByteString("[") ++ byteString, context, key)
case _ =>
throw Errors.UnhandledStreamCase(List(element))
}

/** Fixes the case where is an odd amount of elements in the stream
* @param head
* of stream as a result of `prefixAndTail`
* @param restSource
* of the stream as a result of `prefixAndTail`
* @return
* A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up.
*/
private[backup] def transformTailingElement(
head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])],
restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed]
) = {
val restTransformed = restSource
.sliding(2, step = 2)
.map {
case Seq((before, Some(context)), (after, None)) =>
List((dropCommaFromEndOfJsonArray(before) ++ after, context))
case Seq((before, Some(beforeContext)), (after, Some(afterContext))) =>
List((before, beforeContext), (after, afterContext))
case Seq((single, Some(context))) =>
List((single, context))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}

head match {
case Seq((byteString, Some(cursorContext))) =>
Source.combine(
Source.single((List((byteString, cursorContext)))),
restTransformed
)(Concat(_))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
* data source.
* @return
Expand All @@ -127,56 +261,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] {

val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))

val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
backupStreamPosition == BackupStreamPosition.Boundary
}
val split = withBackupStreamPositions
.splitAfter { case sourceElement =>
sourceElement match {
case End => true
case _ => false
}
}

val substreams = split
.prefixAndTail(1)
.flatMapConcat { case (head, restOfReducedConsumerRecords) =>
head.headOption match {
case Some(((firstReducedConsumerRecord, _), firstContext)) =>
// We need to retrieve the first element of the stream in order to calculate the key/filename
val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)

// Now that we have retrieved the first element of the stream, lets recombine it so we create the
// original stream
val combined = Source.combine(
Source.single(
(
(firstReducedConsumerRecord, BackupStreamPosition.Start),
firstContext
)
),
restOfReducedConsumerRecords
)(Concat(_))
.prefixAndTail(2)
.flatMapConcat {
case (Seq(only: Element, End), _) =>
// This case only occurs when you have a single element in a timeslice.
// We have to terminate immediately to create a JSON array with a single element
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
Source(transformFirstElement(only, key, terminate = true))
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
val firstSource = transformFirstElement(first, key, terminate = false)

val rest = Source.combine(
Source.single(second),
restOfReducedConsumerRecords
)(Concat(_))

// Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream
val transformed = combined.map { case ((record, position), context) =>
val transform = transformReducedConsumerRecords(record, position)
(transform, context)
}
val restTransformed = rest
.sliding(2, step = 2)
.map(transformReducedConsumerRecords)
.mapConcat(identity)
.prefixAndTail(1)
.flatMapConcat((transformTailingElement _).tupled)
.mapConcat(identity)
.map { case (byteString, context) => Tail(byteString, context) }

transformed.map(data => (data, key))
case None => Source.empty
}
Source.combine(
Source(
firstSource
),
restTransformed
)(Concat(_))
case (Seq(only: Element), _) =>
// This case can also occur when user terminates the stream
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
Source(transformFirstElement(only, key, terminate = false))
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
}

// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
@nowarn("msg=method lazyInit in object Sink is deprecated")
val subFlowSink = substreams
.alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
case ((_, context), _) => context
.alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement =>
byteStringElement.context
})
.to(
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
Sink.lazyInit(
{ case (_, key) =>
Future.successful(
backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
case ((byteString, _), _) => byteString
}
)
{
case start: Start =>
Future.successful(
backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement =>
byteStringElement.data
}
)
case _ => throw Errors.ExpectedStartOfSource
},
empty
)
Expand All @@ -202,44 +351,22 @@ object BackupClientInterface {
def calculateKey(offsetDateTime: OffsetDateTime): String =
s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json"

/** Calculates the current position in 2 element sliding of a Stream.
/** Calculates whether we have rolled over a time period given number of divided periods.
* @param dividedPeriodsBefore
* The number of divided periods in the first element of the slide. -1 is used as a sentinel value to indicate the
* start of the stream
* @param dividedPeriodsAfter
* The number of divided periods in the second element of the slide
* @return
* The position of the Stream
* `true` if we have hit a time boundary otherwise `false`
*/
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition =
def isAtBoundary(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean =
(dividedPeriodsBefore, dividedPeriodsAfter) match {
case (before, after) if after > before =>
BackupStreamPosition.Boundary
true
case _ =>
BackupStreamPosition.Middle
}

/** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
* @param reducedConsumerRecord
* The ReducedConsumerRecord to persist
* @param backupStreamPosition
* The position of the record relative in the stream (so it knows if its at the start, middle or end)
* @return
* a `ByteString` ready to be persisted
*/
def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord,
backupStreamPosition: BackupStreamPosition
): ByteString = {
val string = backupStreamPosition match {
case BackupStreamPosition.Start =>
s"[${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Middle =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Boundary =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]"
false
}
ByteString(string)
}

protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
period: FiniteDuration,
Expand Down
Loading

0 comments on commit 4b9593f

Please sign in to comment.