-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream reorder example. Issue #179 #411
Stream reorder example. Issue #179 #411
Conversation
c09deaf
to
ac93d44
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @sshcherbakov looks good overall, I just have a couple of comments.
*/ | ||
public class ReorderIntegrationTest { | ||
|
||
public static class ReorderTransfomer<K, V> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: spelling RecordTransfomer
-> RecordTransformer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
* <p> | ||
* Makes sense only on per partition basis. | ||
* <p> | ||
* Reordering occurs within time windows defined by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think you meant to say "....defined by the provided grace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amended
*/ | ||
@Override | ||
public KeyValue<K, V> transform(final K key, final V value) { | ||
final long ts = timestampExtractor.getTimestamp(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't used, unless you want to provide this for demo purposes - so maybe add a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Kafka Streams TimestampExtractor could have been reused to illustrate pulling the timestamp from the payload, but the method signature is not convenient. Instead of duplicating TimestampExtractor interface with convenient signature removed removed this piece to reduce confusion.
* @param timestamp – stream time of the punctuate function call | ||
*/ | ||
void punctuate(final long timestamp) { | ||
try(KeyValueIterator<K, V> it = reorderStore.all()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments state to use range(from, to)
, but the code uses all()
since the code deletes everything in the store after forwarding, I think it will continue to work as-is, but you should update the comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated comments
7fca2b1
to
b0535d3
Compare
b0535d3
to
9e17e16
Compare
Rebased PR on top of the current master branch. I cannot build master but I checked the test with the latest release branch |
try(KeyValueIterator<K, V> it = reorderStore.all()) { | ||
while (it.hasNext()) { | ||
final KeyValue<K, V> kv = it.next(); | ||
context.forward(kv.key, kv.value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this implementation is flawed. Ordering only makes sense per partition/key.
Say incoming records have key k1
, and there are two messages with timestamps t0
, and t1
. The StoreKeyGenerator
will map that into some combination of the original key and the record timestamp, say something simple like t0_k1
and t1_k1
. However, when you forward these message with the "composite" store key, they'll likely end up in two different partitions. Hence, although you sorted them, any downstream processor/consumer has no guarantee in the order after the re-partition. The correct thing to do would be to preserve the original key, and use that to forward the message, while using the composite key to store/sort/dedup records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is an example. It can be extended with either additional store for composite to original key mapping or a second interface that would allow extracting original key from the composite one before passing downstream.
final K storeKey = storeKeyGenerator.getStoreKey(key, value); | ||
final V storeValue = reorderStore.get(storeKey); | ||
if(storeValue == null) { | ||
reorderStore.put(storeKey, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
putIfAbsent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean pitIfAbsent() is equivalent or it has different logic?
In its description: "Update the value associated with this key, unless a value is already associated with the key."
Before I start interpreting that and guessing whether it will only put my value where there is no key or even when key is there but the value is null, I write my intent explicitly through additional "if" line here.
We are currently starting to migrate the examples in this repository to https://github.com/confluentinc/tutorials, so it didn't make sense to merge this here just to move it later. There is a PR now under review. As a result, I'm closing this PR now. \cc @mjsax , @sshcherbakov |
Thank you @sshcherbakov for the awesome contribution! |
Example of reordering out of order messages using a stateful transformer.