Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fencepost issues with stream algorithm plus fix tests #76

Merged
merged 2 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.guardian.kafka.backup

import akka.NotUsed
import akka.stream.scaladsl._
import akka.util.ByteString
import io.aiven.guardian.kafka.Errors
Expand All @@ -17,25 +18,6 @@ import java.time._
import java.time.format.DateTimeFormatter
import java.time.temporal._

/** A marker used to indicate in which position the current backup stream is
*/
sealed abstract class BackupStreamPosition

object BackupStreamPosition {

/** The backup stream has just started right now
*/
case object Start extends BackupStreamPosition

/** The backup stream is in the middle of a time period
*/
case object Middle extends BackupStreamPosition

/** The backup stream position has just hit a boundary for when a new period starts
*/
case object Boundary extends BackupStreamPosition
}

/** An interface for a template on how to backup a Kafka Stream into some data storage
* @tparam T
* The underlying `kafkaClientInterface` type
Expand All @@ -44,6 +26,29 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
implicit val kafkaClientInterface: T
implicit val backupConfig: Backup

/** An element from the original record
*/
private[backup] sealed trait RecordElement
private[backup] case class Element(reducedConsumerRecord: ReducedConsumerRecord,
context: kafkaClientInterface.CursorContext
) extends RecordElement
private[backup] case object End extends RecordElement

/** An element after the record has been transformed to a ByteString
*/
private[backup] sealed trait ByteStringElement {
val data: ByteString
val context: kafkaClientInterface.CursorContext
}

private[backup] case class Start(override val data: ByteString,
override val context: kafkaClientInterface.CursorContext,
key: String
) extends ByteStringElement
private[backup] case class Tail(override val data: ByteString,
override val context: kafkaClientInterface.CursorContext
) extends ByteStringElement

/** Override this type to define the result of backing up data to a datasource
*/
type BackupResult
Expand All @@ -66,23 +71,60 @@ trait BackupClientInterface[T <: KafkaClientInterface] {
*/
def empty: () => Future[BackupResult]

@nowarn("msg=not.*?exhaustive")
private[backup] def calculateBackupStreamPositions(
sourceWithPeriods: SourceWithContext[(ReducedConsumerRecord, Long),
kafkaClientInterface.CursorContext,
kafkaClientInterface.Control
]
): SourceWithContext[(ReducedConsumerRecord, BackupStreamPosition),
kafkaClientInterface.CursorContext,
kafkaClientInterface.Control
] = sourceWithPeriods
.sliding(2)
.map { case Seq((beforeReducedConsumerRecord, beforeDivisions), (_, afterDivisions)) =>
val backupStreamPosition = splitAtBoundaryCondition(beforeDivisions, afterDivisions)
): Source[RecordElement, kafkaClientInterface.Control] =
sourceWithPeriods.asSource
.prefixAndTail(2)
// This algorithm only works with Source's that have 2 or more elements
.flatMapConcat {
case (Seq(
((firstReducedConsumerRecord, firstDivision), firstContext),
((secondReducedConsumerRecord, secondDivision), secondContext)
),
rest
) =>
val all = Source
.combine(
Source(
List(
((firstReducedConsumerRecord, firstDivision), firstContext),
((secondReducedConsumerRecord, secondDivision), secondContext)
jlprat marked this conversation as resolved.
Show resolved Hide resolved
)
),
rest
)(Concat(_))

(beforeReducedConsumerRecord, backupStreamPosition)
}
.mapContext { case Seq(cursorContext, _) => cursorContext }
val withDivisions =
all
.sliding(2)
.map {
case Seq(((_, beforeDivisions), _), ((afterReducedConsumerRecord, afterDivisions), afterContext)) =>
if (isAtBoundary(beforeDivisions, afterDivisions))
List(
End,
Element(afterReducedConsumerRecord, afterContext)
)
else
List(Element(afterReducedConsumerRecord, afterContext))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}
.mapConcat(identity)

Source.combine(
Source.single(Element(firstReducedConsumerRecord, firstContext)),
withDivisions
)(Concat(_))
// This case only occurs if a Source has a single element so we just directly return it
case (Seq(((singleReducedConsumerRecord, _), singleContext)), _) =>
Source.single(Element(singleReducedConsumerRecord, singleContext))
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
}

private[backup] def sourceWithPeriods(
source: Source[(OffsetDateTime, (ReducedConsumerRecord, kafkaClientInterface.CursorContext)),
Expand All @@ -107,12 +149,101 @@ trait BackupClientInterface[T <: KafkaClientInterface] {

Source.combine(
Source.single((firstTimestamp, (firstReducedConsumerRecord, firstCursorContext))),
rest.map { case (reducedConsumerRecord, context) => (firstTimestamp, (reducedConsumerRecord, context)) }
rest.map { case (reducedConsumerRecord, context) =>
(firstTimestamp, (reducedConsumerRecord, context))
}
)(Concat(_))
case None => throw Errors.ExpectedStartOfSource
}
}

/** Transforms a sequence of [[RecordElement]]'s into a ByteString so that it can be persisted into the data storage
*
* @param sourceElements
* A sequence of [[RecordElement]]'s as a result of `sliding(2)`
* @return
* a [[ByteString]] ready to be persisted along with the original context form the [[RecordElement]]
*/
private[backup] def transformReducedConsumerRecords(sourceElements: Seq[RecordElement]) = {
val stringWithContext = sourceElements match {
case Seq(Element(reducedConsumerRecord, context)) =>
// Happens in Sentinel case that is explicitly called at start of stream OR when a stream is interrupted by the user
// in which case stream needs to be terminated with `null]` in order to be valid
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)},", Some(context)))
jlprat marked this conversation as resolved.
Show resolved Hide resolved
case Seq(Element(firstReducedConsumerRecord, firstContext),
Element(secondReducedConsumerRecord, secondContext)
) =>
List(
(s"${reducedConsumerRecordAsString(firstReducedConsumerRecord)},", Some(firstContext)),
(s"${reducedConsumerRecordAsString(secondReducedConsumerRecord)},", Some(secondContext))
)
case Seq(Element(reducedConsumerRecord, context), End) =>
List((s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]", Some(context)))
case Seq(End) =>
List(("]", None))
case rest => throw Errors.UnhandledStreamCase(rest)
}
stringWithContext.map { case (string, context) => (ByteString(string), context) }
}

/** Applies the transformation to the first element of a Stream so that it starts of as a JSON array.
*
* @param element
* Starting [[Element]]
* @param key
* The current key being processed
* @param terminate
* Whether to immediately terminate the JSON array for single element in Stream case
* @return
* A [[List]] containing a single [[Start]] ready to be processed in the [[Sink]]
*/
private[backup] def transformFirstElement(element: Element, key: String, terminate: Boolean) =
transformReducedConsumerRecords(List(element)).map {
case (byteString, Some(context)) =>
if (terminate)
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's on that 1 byte that you drop?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a , (comma), the transformReducedConsumerRecords adds a comma on the last character which is the standard case for a stream but this is a corner case where we terminate immediately so we drop the , so that we have

...}]

instead of

..},]

(which is not valid JSON)

Do you want me to add a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either that or store this in a val with name strippedTrailingComma. That would be cool. I think this is one of the things that is quickly forgotten

else
Start(ByteString("[") ++ byteString, context, key)
case _ =>
throw Errors.UnhandledStreamCase(List(element))
}

/** Fixes the case where is an odd amount of elements in the stream
* @param head
* of stream as a result of `prefixAndTail`
* @param restSource
* of the stream as a result of `prefixAndTail`
* @return
* A [[List]] of ([[ByteString]], [[kafkaClientInterface.CursorContext]]) with the tail elements fixed up.
*/
private[backup] def transformTailingElement(
head: Seq[(ByteString, Option[kafkaClientInterface.CursorContext])],
restSource: Source[(ByteString, Option[kafkaClientInterface.CursorContext]), NotUsed]
) = {
val restTransformed = restSource
.sliding(2, step = 2)
.map {
case Seq((before, Some(context)), (after, None)) =>
List((before.dropRight(1) ++ after, context))
case Seq((before, Some(beforeContext)), (after, Some(afterContext))) =>
List((before, beforeContext), (after, afterContext))
case Seq((single, Some(context))) =>
List((single, context))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}

head match {
case Seq((byteString, Some(cursorContext))) =>
Source.combine(
Source.single((List((byteString, cursorContext)))),
restTransformed
)(Concat(_))
case rest =>
throw Errors.UnhandledStreamCase(rest)
}
}

/** The entire flow that involves reading from Kafka, transforming the data into JSON and then backing it up into a
* data source.
* @return
Expand All @@ -127,56 +258,71 @@ trait BackupClientInterface[T <: KafkaClientInterface] {

val withBackupStreamPositions = calculateBackupStreamPositions(sourceWithPeriods(sourceWithFirstRecord))

val split = withBackupStreamPositions.asSource.splitAfter { case ((_, backupStreamPosition), _) =>
backupStreamPosition == BackupStreamPosition.Boundary
}
val split = withBackupStreamPositions
.splitAfter { case sourceElement =>
sourceElement match {
case End => true
case _ => false
}
}

val substreams = split
.prefixAndTail(1)
.flatMapConcat { case (head, restOfReducedConsumerRecords) =>
head.headOption match {
case Some(((firstReducedConsumerRecord, _), firstContext)) =>
// We need to retrieve the first element of the stream in order to calculate the key/filename
val key = calculateKey(firstReducedConsumerRecord.toOffsetDateTime)

// Now that we have retrieved the first element of the stream, lets recombine it so we create the
// original stream
val combined = Source.combine(
Source.single(
(
(firstReducedConsumerRecord, BackupStreamPosition.Start),
firstContext
)
),
restOfReducedConsumerRecords
)(Concat(_))
.prefixAndTail(2)
.flatMapConcat {
case (Seq(only: Element, End), _) =>
// This case only occurs when you have a single element in a timeslice.
// We have to terminate immediately to create a JSON array with a single element
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
Source(transformFirstElement(only, key, terminate = true))
case (Seq(first: Element, second: Element), restOfReducedConsumerRecords) =>
val key = calculateKey(first.reducedConsumerRecord.toOffsetDateTime)
val firstSource = transformFirstElement(first, key, terminate = false)

val rest = Source.combine(
Source.single(second),
restOfReducedConsumerRecords
)(Concat(_))

// Go through every element in the stream and convert the `ReducedCustomerRecord` to an actual bytestream
val transformed = combined.map { case ((record, position), context) =>
val transform = transformReducedConsumerRecords(record, position)
(transform, context)
}
val restTransformed = rest
.sliding(2, step = 2)
.map(transformReducedConsumerRecords)
.mapConcat(identity)
.prefixAndTail(1)
.flatMapConcat((transformTailingElement _).tupled)
.mapConcat(identity)
.map { case (byteString, context) => Tail(byteString, context) }

transformed.map(data => (data, key))
case None => Source.empty
}
Source.combine(
Source(
firstSource
),
restTransformed
)(Concat(_))
case (Seq(only: Element), _) =>
// This case can also occur when user terminates the stream
val key = calculateKey(only.reducedConsumerRecord.toOffsetDateTime)
Source(transformFirstElement(only, key, terminate = false))
case (rest, _) =>
throw Errors.UnhandledStreamCase(rest)
}

// Note that .alsoTo triggers after .to, see https://stackoverflow.com/questions/47895991/multiple-sinks-in-the-same-stream#comment93028098_47896071
@nowarn("msg=method lazyInit in object Sink is deprecated")
val subFlowSink = substreams
.alsoTo(kafkaClientInterface.commitCursor.contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
case ((_, context), _) => context
.alsoTo(kafkaClientInterface.commitCursor.contramap[ByteStringElement] { byteStringElement =>
byteStringElement.context
})
.to(
// See https://stackoverflow.com/questions/68774425/combine-prefixandtail1-with-sink-lazysink-for-subflow-created-by-splitafter/68776660?noredirect=1#comment121558518_68776660
Sink.lazyInit(
{ case (_, key) =>
Future.successful(
backupToStorageSink(key).contramap[((ByteString, kafkaClientInterface.CursorContext), String)] {
case ((byteString, _), _) => byteString
}
)
{
case start: Start =>
Future.successful(
backupToStorageSink(start.key).contramap[ByteStringElement] { byteStringElement =>
byteStringElement.data
}
)
case _ => throw Errors.ExpectedStartOfSource
},
empty
)
Expand All @@ -202,44 +348,22 @@ object BackupClientInterface {
def calculateKey(offsetDateTime: OffsetDateTime): String =
s"${BackupClientInterface.formatOffsetDateTime(offsetDateTime)}.json"

/** Calculates the current position in 2 element sliding of a Stream.
/** Calculates whether we have rolled over a time period given number of divided periods.
* @param dividedPeriodsBefore
* The number of divided periods in the first element of the slide. -1 is used as a sentinel value to indicate the
* start of the stream
* @param dividedPeriodsAfter
* The number of divided periods in the second element of the slide
* @return
* The position of the Stream
* `true` if we have hit a time boundary otherwise `false`
*/
def splitAtBoundaryCondition(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): BackupStreamPosition =
def isAtBoundary(dividedPeriodsBefore: Long, dividedPeriodsAfter: Long): Boolean =
(dividedPeriodsBefore, dividedPeriodsAfter) match {
case (before, after) if after > before =>
BackupStreamPosition.Boundary
true
case _ =>
BackupStreamPosition.Middle
}

/** Transforms a `ReducedConsumer` record into a ByteString so that it can be persisted into the data storage
* @param reducedConsumerRecord
* The ReducedConsumerRecord to persist
* @param backupStreamPosition
* The position of the record relative in the stream (so it knows if its at the start, middle or end)
* @return
* a `ByteString` ready to be persisted
*/
def transformReducedConsumerRecords(reducedConsumerRecord: ReducedConsumerRecord,
backupStreamPosition: BackupStreamPosition
): ByteString = {
val string = backupStreamPosition match {
case BackupStreamPosition.Start =>
s"[${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Middle =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)},"
case BackupStreamPosition.Boundary =>
s"${reducedConsumerRecordAsString(reducedConsumerRecord)}]"
false
}
ByteString(string)
}

protected def calculateNumberOfPeriodsFromTimestamp(initialTime: OffsetDateTime,
period: FiniteDuration,
Expand Down
Loading