Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fencepost issues with stream algorithm plus fix tests #76

Merged
merged 2 commits into from
Jan 4, 2022

Conversation

mdedetrich
Copy link
Contributor

About this change - What it does

This PR fixes the prevalent issues regarding fencepost cases in regards to the streaming algorithm. The PR should fix most (highly likely all) problems identified with #75 .

The PR also fixes regressions that occurred in tests (which meant that the mock tests weren't actually testing anything properly) and also adds additional tests including one that critically makes sure that termination of JSON works correctly which is import for the restore part of the tool.

Why this way

There are multiple issues that this PR addresses, one of the reasons why the PR is non trivial in size is that initially the intention was to fix the issue with tests being broken which when resolved revealed other problems, a summation is provided here

  • The initial reason why the tests was not running correctly is that we were using a Java ConcurrentLinkedQueue that was converted to Scala using the standard asScala() conversion. Unfortunately I didn't realize that this conversion also changed the data structure from a mutable one to an immutable one which meant that adding an element to the collection using backedUpData ++ Iterable((key, byteString)) did nothing due to it being immutable and discarding the reference. This was solved by just using the original Java ConcurrentLinkedQueue collection and its .add method.
  • A test was added that verifies termination was done correctly, as mentioned previously
  • Beforehand the related tests were just verifying that the observed data from the mock was a subset of the original generated data, the original intent of writing the tests this way was to avoid having to deal with manually terminating the data. It turns out there was a lot of bugs in this specific case so the tests have now been changed to precisely check that all observed data is exactly the same as the generated data
  • Due to how the windowing on the stream is done with sliding(2), explicit tests were added for streams of size 1 and 2. Although the property tests generally handle this case, its more ergonomic to explicitly test these cases so if a regression is caused its picked up reliably
  • mergeBackedUpData was modified to not lose key insertion order (which is a property of immutable.Map). Extra parameters was also added to mergeBackedUpData to make sure the testing is more reliable.
    • Note that the sort parameter was added after realizing that in some cases the stream is processed out of order which was causing the backup method completes flow correctly for two elements to fail non-deterministically due to ordering issues (this occurred even when trying to throttle the source). In reality with an actual backup source this should be a non concern.
  • Another cause of the tests randomly failing was apparently due to akka-stream initialization, something @jlprat noticed earlier. Normally this is not an issue because you can figure out when a stream ends after all elements from Source have been emitted however one of the properties of using substreams as a result of groupBy/splitWhen/splitAfter is that the stream runs for in perpetuity. Due to this AkkaStreamInitializationConstant has been added, which is a time constant which related tests need to wait for.
  • A large swath of the original streaming algorithm had to be updated because fixing the tests revealed some fencepost cases. In summary there are 2 causes of fencepost corner cases which create their own permutations that need to be handled
    • .sliding behaves differently for streams that have even number of elements vs odd number of elements depending on the step parameter, i.e. notice the difference between
    List(1,2,3,4,5).sliding(2)
    Vector(1, 2)
    Vector(2, 3)
    Vector(3, 4)
    Vector(4, 5)
    vs
    List(1,2,3,4).sliding(2)
    Vector(1, 2)
    Vector(2, 3)
    Vector(3, 4)
    • The actual construction of JSON also added its own corner cases, i.e. a single element in a stream is its own corner cases because it creates a JSON array of just a single element which needs to be immediately terminated. This is why in a lot of cases we have to do prefixAndTail(2) vs prefixAndTail(1) because we need to see if we are dealing with a stream that only has a single element and handle it explicitly.
    • The usage of boundaries to detect the time slices has also changed, rather than coupling every ReducedConsumerRecord in the stream with the original Boundary this PR changes it so that boundaries work closer to tombstones. This made the handling of corner cases much easier.
    • Another unintended improvement is less memory usage, for example with ByteStringElement rather than having the key in every element of the stream, its now only in the Start (which is when its needed for Sink.lazyInit) and every further element in the stream for this time slice is only the needed data/context, represented as Tail.
    • This method is also closer in style to what Alpakka does, i.e. the buffering in Alpakka also inserts watermarks as case object when detecting various boundaries.
  • Rather than just suppressing cases with partial functions which we think shouldn't occur, Errors.UnhandledStreamCase is thrown along with the rest case to better diagnose problems.

To make sure that these tests are no longer flaky, I ran the tests in a loop using https://stackoverflow.com/a/38283446/1519631 for an entire day to make sure no single test has failed. Ideally when this gets merged, rebasing these changes with #75 byitself should fix all of the remaining problems with that PR and finalize the backup portion of the tool.

As a final note, it is definitely plausible that there may be a better way of handling all of the streaming corner cases however at least with the current implementation, as far as I can tell I have handled the corner cases which is most important and with the tests now working if improvements are possible we can actually make sure regressions aren't created.

@mdedetrich mdedetrich requested a review from jlprat December 13, 2021 04:57
@mdedetrich mdedetrich force-pushed the fix-stream-fencepost-errors branch from 6ef2fcd to a023848 Compare December 13, 2021 05:02
@mdedetrich mdedetrich force-pushed the fix-stream-fencepost-errors branch from a023848 to cf7c917 Compare January 2, 2022 22:06
Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questions, and minor comments. Feel free to merge if this is blocking you.
We can discuss my minor comments afterwards

transformReducedConsumerRecords(List(element)).map {
case (byteString, Some(context)) =>
if (terminate)
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's on that 1 byte that you drop?

Copy link
Contributor Author

@mdedetrich mdedetrich Jan 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a , (comma), the transformReducedConsumerRecords adds a comma on the last character which is the standard case for a stream but this is a corner case where we terminate immediately so we drop the , so that we have

...}]

instead of

..},]

(which is not valid JSON)

Do you want me to add a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either that or store this in a val with name strippedTrailingComma. That would be cool. I think this is one of the things that is quickly forgotten

core/src/main/scala/io/aiven/guardian/kafka/Errors.scala Outdated Show resolved Hide resolved
@mdedetrich mdedetrich force-pushed the fix-stream-fencepost-errors branch from cf7c917 to e103c39 Compare January 4, 2022 02:20
@mdedetrich mdedetrich force-pushed the fix-stream-fencepost-errors branch from e103c39 to a871726 Compare January 4, 2022 02:30
Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to add the comment or a variable for the trailing comma removal

transformReducedConsumerRecords(List(element)).map {
case (byteString, Some(context)) =>
if (terminate)
Start(ByteString("[") ++ byteString.dropRight(1) ++ ByteString("]"), context, key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either that or store this in a val with name strippedTrailingComma. That would be cool. I think this is one of the things that is quickly forgotten

Copy link
Contributor

@jlprat jlprat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mdedetrich mdedetrich merged commit 4b9593f into main Jan 4, 2022
@mdedetrich mdedetrich deleted the fix-stream-fencepost-errors branch January 4, 2022 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants