-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix fencepost issues with stream algorithm plus fix tests #76
Conversation
6ef2fcd
to
a023848
Compare
a023848
to
cf7c917
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some questions, and minor comments. Feel free to merge if this is blocking you.
We can discuss my minor comments afterwards
core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala
Show resolved
Hide resolved
core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala
Show resolved
Hide resolved
transformReducedConsumerRecords(List(element)).map { | ||
case (byteString, Some(context)) => | ||
if (terminate) | ||
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's on that 1 byte that you drop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a ,
(comma), the transformReducedConsumerRecords
adds a comma
on the last character which is the standard case for a stream but this is a corner case where we terminate immediately so we drop the ,
so that we have
...}]
instead of
..},]
(which is not valid JSON)
Do you want me to add a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either that or store this in a val with name strippedTrailingComma
. That would be cool. I think this is one of the things that is quickly forgotten
core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala
Outdated
Show resolved
Hide resolved
cf7c917
to
e103c39
Compare
e103c39
to
a871726
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Feel free to add the comment or a variable for the trailing comma removal
transformReducedConsumerRecords(List(element)).map { | ||
case (byteString, Some(context)) => | ||
if (terminate) | ||
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either that or store this in a val with name strippedTrailingComma
. That would be cool. I think this is one of the things that is quickly forgotten
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
About this change - What it does
This PR fixes the prevalent issues regarding fencepost cases in regards to the streaming algorithm. The PR should fix most (highly likely all) problems identified with #75 .
The PR also fixes regressions that occurred in tests (which meant that the mock tests weren't actually testing anything properly) and also adds additional tests including one that critically makes sure that termination of JSON works correctly which is import for the restore part of the tool.
Why this way
There are multiple issues that this PR addresses, one of the reasons why the PR is non trivial in size is that initially the intention was to fix the issue with tests being broken which when resolved revealed other problems, a summation is provided here
ConcurrentLinkedQueue
that was converted to Scala using the standardasScala()
conversion. Unfortunately I didn't realize that this conversion also changed the data structure from a mutable one to an immutable one which meant that adding an element to the collection usingbackedUpData ++ Iterable((key, byteString))
did nothing due to it being immutable and discarding the reference. This was solved by just using the original JavaConcurrentLinkedQueue
collection and its.add
method.sliding(2)
, explicit tests were added for streams of size1
and2
. Although the property tests generally handle this case, its more ergonomic to explicitly test these cases so if a regression is caused its picked up reliablymergeBackedUpData
was modified to not lose key insertion order (which is a property ofimmutable.Map
). Extra parameters was also added tomergeBackedUpData
to make sure the testing is more reliable.sort
parameter was added after realizing that in some cases the stream is processed out of order which was causing thebackup method completes flow correctly for two elements
to fail non-deterministically due to ordering issues (this occurred even when trying to throttle the source). In reality with an actual backup source this should be a non concern.Source
have been emitted however one of the properties of using substreams as a result ofgroupBy
/splitWhen
/splitAfter
is that the stream runs for in perpetuity. Due to thisAkkaStreamInitializationConstant
has been added, which is a time constant which related tests need to wait for..sliding
behaves differently for streams that have even number of elements vs odd number of elements depending on thestep
parameter, i.e. notice the difference betweenprefixAndTail(2)
vsprefixAndTail(1)
because we need to see if we are dealing with a stream that only has a single element and handle it explicitly.ReducedConsumerRecord
in the stream with the originalBoundary
this PR changes it so that boundaries work closer to tombstones. This made the handling of corner cases much easier.ByteStringElement
rather than having thekey
in every element of the stream, its now only in theStart
(which is when its needed forSink.lazyInit
) and every further element in the stream for this time slice is only the needed data/context, represented asTail
.Alpakka
also inserts watermarks ascase object
when detecting various boundaries.Errors.UnhandledStreamCase
is thrown along with therest
case to better diagnose problems.To make sure that these tests are no longer flaky, I ran the tests in a loop using https://stackoverflow.com/a/38283446/1519631 for an entire day to make sure no single test has failed. Ideally when this gets merged, rebasing these changes with #75 byitself should fix all of the remaining problems with that PR and finalize the backup portion of the tool.
As a final note, it is definitely plausible that there may be a better way of handling all of the streaming corner cases however at least with the current implementation, as far as I can tell I have handled the corner cases which is most important and with the tests now working if improvements are possible we can actually make sure regressions aren't created.