-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36296][Connectors/DynamoDB] Add support for incremental shard discovery #166
Conversation
...a/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
Outdated
Show resolved
Hide resolved
…discovery in DDB Streams source
+ "This indicates that either there might be some data loss in the " | ||
+ "application or the child shard hasn't been discovered yet", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we see this very often? If so, this is worrying, and we should create a JIRA to track an improvement. We know our setup is resilient against this, because the child will get picked up eventually, but it would be good to not worry people unnecessarily!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeh, we see this often if we are always at the tip of the log and child shards haven't been discovered yet :)
.sequenceNumberRange(SequenceNumberRange.builder().build()) | ||
.build()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work with the inconsistency tracker? Don't we use the sequence number to detect open/closed shards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you're right, here we are under assumption that there's no inconsistent shard returend from splitTracker so all leaf nodes are open. The shard returned from incremental shard discovery will remove the open leaf node and add itself as a leaf node
@@ -250,6 +252,24 @@ public Set<String> getKnownSplitIds() { | |||
return knownSplits.keySet(); | |||
} | |||
|
|||
public List<Shard> getKnownShards() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not unit tested!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add UT.
@@ -101,6 +105,13 @@ public String getEarliestClosedLeafNode() { | |||
return closedLeafNodes.first(); | |||
} | |||
|
|||
public String getLatestLeafNode() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not unit tested!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add UT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @gguptp ! Left some comments
Purpose of the change
Adds incremental shard discovery for DDB Streams source. Since full describestream discovery can take a longer time, having incremental shard discovery ensures we have children shards earlier than completing the full discovery
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
Significant changes
(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)
@Public(Evolving)
)