-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36296][Connectors/DynamoDB] Add support for incremental shard discovery #166
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; | ||
import software.amazon.awssdk.services.dynamodb.model.Shard; | ||
|
||
import java.util.Collection; | ||
|
@@ -185,7 +186,8 @@ public List<DynamoDbStreamsShardSplit> getUnassignedChildSplits(Set<String> pare | |
if (!parentChildSplitMap.containsKey(splitId)) { | ||
LOG.warn( | ||
"splitId: {} is not present in parent-child relationship map. " | ||
+ "This indicates that there might be some data loss in the application", | ||
+ "This indicates that either there might be some data loss in the " | ||
+ "application or the child shard hasn't been discovered yet", | ||
Comment on lines
+189
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we see this very often? If so, this is worrying, and we should create a JIRA to track an improvement. We know our setup is resilient against this, because the child will get picked up eventually, but it would be good to not worry people unnecessarily! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeh, we see this often if we are always at the tip of the log and child shards haven't been discovered yet :) |
||
splitId); | ||
} | ||
return parentChildSplitMap.containsKey(splitId); | ||
|
@@ -250,6 +252,24 @@ public Set<String> getKnownSplitIds() { | |
return knownSplits.keySet(); | ||
} | ||
|
||
public List<Shard> getKnownShards() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not unit tested! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will add UT. |
||
return knownSplits | ||
.values() | ||
.parallelStream() | ||
.filter( | ||
split -> | ||
!ShardUtils.isShardOlderThanInconsistencyDetectionRetentionPeriod( | ||
split.splitId())) | ||
.map( | ||
split -> | ||
Shard.builder() | ||
.shardId(split.splitId()) | ||
.parentShardId(split.getParentShardId()) | ||
.sequenceNumberRange(SequenceNumberRange.builder().build()) | ||
.build()) | ||
Comment on lines
+268
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this work with the inconsistency tracker? Don't we use the sequence number to detect open/closed shards? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, you're right, here we are under assumption that there's no inconsistent shard returend from splitTracker so all leaf nodes are open. The shard returned from incremental shard discovery will remove the open leaf node and add itself as a leaf node |
||
.collect(Collectors.toList()); | ||
} | ||
|
||
/** | ||
* finishedSplits needs to be cleaned up. The logic applied to cleaning up finished splits is | ||
* that if any split has been finished reading, its parent has been finished reading and it is | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not unit tested!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add UT.