Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36296][Connectors/DynamoDB] Add support for incremental shard discovery #166

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public enum InitialPosition {
.defaultValue(Duration.ofSeconds(60))
.withDescription("The interval between each attempt to discover new shards.");

public static final ConfigOption<Duration> INCREMENTAL_SHARD_DISCOVERY_INTERVAL =
ConfigOptions.key("flink.shard.incremental.discovery.intervalmillis")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("The interval between each attempt to discover new shards.");

public static final ConfigOption<Integer> DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT =
ConfigOptions.key("flink.describestream.inconsistencyresolution.retries")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.stream.Collectors;

import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT;
import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.INCREMENTAL_SHARD_DISCOVERY_INTERVAL;
import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL;
import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION;

Expand Down Expand Up @@ -104,6 +105,13 @@ public DynamoDbStreamsSourceEnumerator(
public void start() {
context.callAsync(this::discoverSplits, this::processDiscoveredSplits);
final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL).toMillis();
final long incrementalShardDiscoveryInterval =
sourceConfig.get(INCREMENTAL_SHARD_DISCOVERY_INTERVAL).toMillis();
context.callAsync(
this::incrementallyDiscoverSplits,
this::processDiscoveredSplits,
0,
incrementalShardDiscoveryInterval);
context.callAsync(
this::discoverSplits,
this::processDiscoveredSplits,
Expand Down Expand Up @@ -165,42 +173,6 @@ private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwabl
assignAllAvailableSplits();
}

/**
* This method tracks the discovered splits in a graph and if the graph has inconsistencies, it
* tries to resolve them using DescribeStream calls using the first inconsistent node found in
* the split graph.
*
* @param discoveredSplits splits discovered after calling DescribeStream at the start of the
* application or periodically.
*/
private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies(
ListShardsResult discoveredSplits) {
SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
new SplitGraphInconsistencyTracker();
splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards());

// we don't want to do inconsistency checks for DISABLED streams because there will be no
// open child shard in DISABLED stream
boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED);
int describeStreamInconsistencyResolutionCount =
sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
for (int i = 0;
i < describeStreamInconsistencyResolutionCount
&& !streamDisabled
&& splitGraphInconsistencyTracker.inconsistencyDetected();
i++) {
String earliestClosedLeafNodeId =
splitGraphInconsistencyTracker.getEarliestClosedLeafNode();
LOG.warn(
"We have detected inconsistency with DescribeStream output, resolving inconsistency with shardId: {}",
earliestClosedLeafNodeId);
ListShardsResult shardsToResolveInconsistencies =
streamProxy.listShards(streamArn, earliestClosedLeafNodeId);
splitGraphInconsistencyTracker.addNodes(shardsToResolveInconsistencies.getShards());
}
return splitGraphInconsistencyTracker;
}

private void assignAllAvailableSplits() {
List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
splitTracker.splitsAvailableForAssignment();
Expand Down Expand Up @@ -239,35 +211,75 @@ public void close() throws IOException {
streamProxy.close();
}

private ListShardsResult incrementallyDiscoverSplits() {
List<Shard> shardList = splitTracker.getKnownShards();
SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
new SplitGraphInconsistencyTracker();
splitGraphInconsistencyTracker.addNodes(shardList);
ListShardsResult listShardsResult =
streamProxy.listShards(
streamArn, splitGraphInconsistencyTracker.getLatestLeafNode());
return trackSplitsAndResolveInconsistencies(
splitGraphInconsistencyTracker, listShardsResult);
}

/**
* This method is used to discover DynamoDb Streams splits the job can subscribe to. It can be
* run in parallel, is important to not mutate any shared state.
* This method tracks the discovered splits in a graph and if the graph has inconsistencies, it
* tries to resolve them using DescribeStream calls using the first inconsistent node found in
* the split graph.
*
* @return list of discovered splits
* @param listShardsResult shards discovered after calling DescribeStream at the start of the
* application or periodically via either incremental discovery or full discovery.
*/
private ListShardsResult discoverSplits() {
ListShardsResult listShardsResult = streamProxy.listShards(streamArn, null);
SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
trackSplitsAndResolveInconsistencies(listShardsResult);

private ListShardsResult trackSplitsAndResolveInconsistencies(
SplitGraphInconsistencyTracker splitGraphInconsistencyTracker,
ListShardsResult listShardsResult) {
splitGraphInconsistencyTracker.addNodes(listShardsResult.getShards());
boolean streamDisabled = listShardsResult.getStreamStatus().equals(StreamStatus.DISABLED);
int describeStreamInconsistencyResolutionCount =
sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT);
for (int i = 0;
i < describeStreamInconsistencyResolutionCount
&& !streamDisabled
&& splitGraphInconsistencyTracker.inconsistencyDetected();
i++) {
String earliestClosedLeafNodeId =
splitGraphInconsistencyTracker.getEarliestClosedLeafNode();
LOG.warn(
"We have detected inconsistency with DescribeStream output, resolving inconsistency with shardId: {}",
earliestClosedLeafNodeId);
ListShardsResult shardsToResolveInconsistencies =
streamProxy.listShards(streamArn, earliestClosedLeafNodeId);
splitGraphInconsistencyTracker.addNodes(shardsToResolveInconsistencies.getShards());
}
ListShardsResult discoveredSplits = new ListShardsResult();
discoveredSplits.setStreamStatus(listShardsResult.getStreamStatus());
discoveredSplits.setInconsistencyDetected(listShardsResult.getInconsistencyDetected());
List<Shard> shardList = new ArrayList<>(splitGraphInconsistencyTracker.getNodes());
// We do not throw an exception here and just return to let SplitTracker process through the
// splits it has not yet processed. This might be helpful for large streams which see a lot
// of
// inconsistency issues.
if (splitGraphInconsistencyTracker.inconsistencyDetected()) {
LOG.error(
"There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:"
+ splitGraphInconsistencyTracker.getEarliestClosedLeafNode());
return discoveredSplits;
}
discoveredSplits.addShards(shardList);
discoveredSplits.addShards(new ArrayList<>(splitGraphInconsistencyTracker.getNodes()));
return discoveredSplits;
}

/**
* This method is used to discover DynamoDb Streams splits the job can subscribe to. It can be
* run in parallel, is important to not mutate any shared state.
*
* @return list of discovered splits
*/
private ListShardsResult discoverSplits() {
ListShardsResult listShardsResult = streamProxy.listShards(streamArn, null);
SplitGraphInconsistencyTracker splitGraphInconsistencyTracker =
new SplitGraphInconsistencyTracker();
splitGraphInconsistencyTracker.addNodes(listShardsResult.getShards());
return trackSplitsAndResolveInconsistencies(
splitGraphInconsistencyTracker, listShardsResult);
}

private void assignSplitToSubtask(
DynamoDbStreamsShardSplit split,
Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
*/
public class SplitGraphInconsistencyTracker {
private final TreeSet<String> closedLeafNodes;
private final TreeSet<String> leafNodes;
private final Map<String, Shard> nodes;
private static final Logger LOG =
LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class);

public SplitGraphInconsistencyTracker() {
nodes = new HashMap<>();
closedLeafNodes = new TreeSet<>();
leafNodes = new TreeSet<>();
}

/**
Expand All @@ -68,11 +70,13 @@ public void addNodes(List<Shard> shards) {
private void removeParentFromClosedLeafNodes(Shard shard) {
if (shard.parentShardId() != null) {
closedLeafNodes.remove(shard.parentShardId());
leafNodes.remove(shard.parentShardId());
}
}

private void addNode(Shard shard) {
nodes.put(shard.shardId(), shard);
leafNodes.add(shard.shardId());
if (shard.sequenceNumberRange().endingSequenceNumber() != null) {
closedLeafNodes.add(shard.shardId());
}
Expand Down Expand Up @@ -101,6 +105,13 @@ public String getEarliestClosedLeafNode() {
return closedLeafNodes.first();
}

public String getLatestLeafNode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not unit tested!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add UT.

if (leafNodes.isEmpty()) {
return null;
}
return leafNodes.last();
}

public Collection<Shard> getNodes() {
return nodes.values();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
import software.amazon.awssdk.services.dynamodb.model.Shard;

import java.util.Collection;
Expand Down Expand Up @@ -185,7 +186,8 @@ public List<DynamoDbStreamsShardSplit> getUnassignedChildSplits(Set<String> pare
if (!parentChildSplitMap.containsKey(splitId)) {
LOG.warn(
"splitId: {} is not present in parent-child relationship map. "
+ "This indicates that there might be some data loss in the application",
+ "This indicates that either there might be some data loss in the "
+ "application or the child shard hasn't been discovered yet",
Comment on lines +189 to +190
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we see this very often? If so, this is worrying, and we should create a JIRA to track an improvement. We know our setup is resilient against this, because the child will get picked up eventually, but it would be good to not worry people unnecessarily!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeh, we see this often if we are always at the tip of the log and child shards haven't been discovered yet :)

splitId);
}
return parentChildSplitMap.containsKey(splitId);
Expand Down Expand Up @@ -250,6 +252,24 @@ public Set<String> getKnownSplitIds() {
return knownSplits.keySet();
}

public List<Shard> getKnownShards() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not unit tested!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add UT.

return knownSplits
.values()
.parallelStream()
.filter(
split ->
!ShardUtils.isShardOlderThanInconsistencyDetectionRetentionPeriod(
split.splitId()))
.map(
split ->
Shard.builder()
.shardId(split.splitId())
.parentShardId(split.getParentShardId())
.sequenceNumberRange(SequenceNumberRange.builder().build())
.build())
Comment on lines +268 to +269
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work with the inconsistency tracker? Don't we use the sequence number to detect open/closed shards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're right, here we are under assumption that there's no inconsistent shard returend from splitTracker so all leaf nodes are open. The shard returned from incremental shard discovery will remove the open leaf node and add itself as a leaf node

.collect(Collectors.toList());
}

/**
* finishedSplits needs to be cleaned up. The logic applied to cleaning up finished splits is
* that if any split has been finished reading, its parent has been finished reading and it is
Expand Down
Loading
Loading