From c890f629fd982da2309437eb80fc46357faa73b9 Mon Sep 17 00:00:00 2001 From: gguptp Date: Thu, 12 Sep 2024 17:40:26 +0530 Subject: [PATCH] [FLINK-36270][Connectors/DynamoDB] Assign only children splits of a split when it is marked as finished --- .../DynamoDbStreamsSourceEnumerator.java | 21 ++++- .../enumerator/tracker/SplitTracker.java | 74 +++++++++++++--- .../enumerator/tracker/SplitTrackerTest.java | 85 +++++++++++++++++++ 3 files changed, 166 insertions(+), 14 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java index e62c0f38..8a8f29ca 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java @@ -128,6 +128,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } } + /** When we mark a split as finished, we will only assign its child splits to the subtasks. */ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) { splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds()); splitAssignment @@ -137,7 +138,7 @@ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinis splitsFinishedEvent .getFinishedSplitIds() .contains(split.splitId())); - assignSplits(); + assignChildSplits(splitsFinishedEvent.getFinishedSplitIds()); } private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) { @@ -161,7 +162,7 @@ private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwabl context.registeredReaders().size()); return; } - assignSplits(); + assignAllAvailableSplits(); } /** @@ -200,9 +201,21 @@ private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies( return splitGraphInconsistencyTracker; } - private void assignSplits() { + private void assignAllAvailableSplits() { + List splitsAvailableForAssignment = + splitTracker.splitsAvailableForAssignment(); + assignSplits(splitsAvailableForAssignment); + } + + private void assignChildSplits(Set finishedSplitIds) { + List splitsAvailableForAssignment = + splitTracker.getUnassignedChildSplits(finishedSplitIds); + assignSplits(splitsAvailableForAssignment); + } + + private void assignSplits(List splitsAvailableForAssignment) { Map> newSplitAssignments = new HashMap<>(); - for (DynamoDbStreamsShardSplit split : splitTracker.splitsAvailableForAssignment()) { + for (DynamoDbStreamsShardSplit split : splitsAvailableForAssignment) { assignSplitToSubtask(split, newSplitAssignments); } updateSplitAssignment(newSplitAssignments); diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java index 2269ce6e..73885ddd 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java @@ -28,6 +28,8 @@ import org.apache.flink.connector.dynamodb.source.split.StartingPosition; import org.apache.flink.connector.dynamodb.source.util.ShardUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.dynamodb.model.Shard; import java.util.Collection; @@ -50,10 +52,12 @@ @Internal public class SplitTracker { private final Map knownSplits = new ConcurrentHashMap<>(); + private final Map> parentChildSplitMap = new ConcurrentHashMap<>(); private final Set assignedSplits = new HashSet<>(); private final Set finishedSplits = new HashSet<>(); private final String streamArn; private final InitialPosition initialPosition; + private static final Logger LOG = LoggerFactory.getLogger(SplitTracker.class); public SplitTracker(String streamArn, InitialPosition initialPosition) { this(Collections.emptyList(), streamArn, initialPosition); @@ -69,7 +73,7 @@ public SplitTracker( splitWithStatus -> { DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split(); knownSplits.put(currentSplit.splitId(), currentSplit); - + addSplitToMapping(currentSplit); if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) { assignedSplits.add(splitWithStatus.split().splitId()); } @@ -93,10 +97,20 @@ public void addSplits(Collection shardsToAdd) { DynamoDbStreamsShardSplit newSplit = mapToSplit(shard, getStartingPosition(shard, discoveredShardIds)); knownSplits.put(shardId, newSplit); + addSplitToMapping(newSplit); } } } + private void addSplitToMapping(DynamoDbStreamsShardSplit split) { + if (split.getParentShardId() == null) { + return; + } + parentChildSplitMap + .computeIfAbsent(split.getParentShardId(), k -> new HashSet<>()) + .add(split.splitId()); + } + /** * If there is no parent in knownSplits of the current shard, that means that the parent has * been read already, so we start the current shard with the specified initial position. We do @@ -159,24 +173,63 @@ public boolean isAssigned(String splitId) { return assignedSplits.contains(splitId); } + /** + * Function to get children splits available for given parent ids. This will ensure not to + * iterate all the values in knownSplits so saving compute + */ + public List getUnassignedChildSplits(Set parentSplitIds) { + return parentSplitIds + .parallelStream() + .filter( + splitId -> { + if (!parentChildSplitMap.containsKey(splitId)) { + LOG.warn( + "splitId: {} is not present in parent-child relationship map. " + + "This indicates that there might be some data loss in the application", + splitId); + } + return parentChildSplitMap.containsKey(splitId); + }) + .map(parentChildSplitMap::get) + .flatMap(Set::stream) + .filter(knownSplits::containsKey) + .map(knownSplits::get) + .filter(this::checkIfSplitCanBeAssigned) + .collect(Collectors.toList()); + } + + /** + * Tells whether a split can be assigned or not. Conditions which it checks: + * + *

- Split should not already be assigned. + * + *

- Split should not be already finished. + * + *

- The parent splits should either be finished or no longer be present in knownSplits. + */ + private boolean checkIfSplitCanBeAssigned(DynamoDbStreamsShardSplit split) { + boolean splitIsNotAssigned = !isAssigned(split.splitId()); + return splitIsNotAssigned + && !isFinished(split.splitId()) + && verifyParentIsEitherFinishedOrCleanedUp(split); + } + /** * Since we never put an inconsistent shard lineage to splitTracker, so if a shard's parent is * not there, that means that that should already be cleaned up. */ public List splitsAvailableForAssignment() { - return knownSplits.values().stream() - .filter( - split -> { - boolean splitIsNotAssigned = !isAssigned(split.splitId()); - return splitIsNotAssigned - && !isFinished(split.splitId()) - && verifyParentIsEitherFinishedOrCleanedUp(split); - }) + return knownSplits + .values() + .parallelStream() + .filter(this::checkIfSplitCanBeAssigned) .collect(Collectors.toList()); } public List snapshotState(long checkpointId) { - return knownSplits.values().stream() + return knownSplits + .values() + .parallelStream() .map( split -> { SplitAssignmentStatus assignmentStatus = @@ -209,6 +262,7 @@ public void cleanUpOldFinishedSplits(Set discoveredSplitIds) { if (isSplitReadyToBeCleanedUp(finishedSplit, discoveredSplitIds)) { finishedSplits.remove(finishedSplitId); knownSplits.remove(finishedSplitId); + parentChildSplitMap.remove(finishedSplit.splitId()); } } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java index 599bb2c9..dc614a2a 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTrackerTest.java @@ -228,6 +228,91 @@ public void testOrderedAllUnassignedSplitsWithoutParentsAvailableForAssignment() assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splitsWithoutParents); } + @Test + public void testOrderedUnassignedChildSplitsParentsFinishedAvailableForAssignment() { + List finishedParentShards = + Arrays.asList( + generateShard(1, "1000", "3000", null), + generateShard(2, "2000", "4000", null), + generateShard(3, "3000", "5000", null)); + List shardsWithParents = + Arrays.asList( + generateShard(4, "4000", null, generateShardId(1)), + generateShard(5, "5000", null, generateShardId(1)), + generateShard(6, "6000", null, generateShardId(2)), + generateShard(7, "7000", null, generateShardId(3))); + List shardsWithoutParents = + Arrays.asList( + generateShard(8, "8000", null, null), generateShard(9, "9000", null, null)); + + SplitTracker splitTracker = new SplitTracker(STREAM_ARN, InitialPosition.TRIM_HORIZON); + splitTracker.addSplits( + Stream.of(finishedParentShards, shardsWithParents, shardsWithParents) + .flatMap(Collection::stream) + .collect(Collectors.toList())); + splitTracker.markAsFinished( + finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList())); + + List unassignedChildSplits = + splitTracker.getUnassignedChildSplits( + finishedParentShards.stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); + + assertThat(unassignedChildSplits) + .containsExactlyInAnyOrderElementsOf( + shardsWithParents.stream() + .map(this::getSplitFromShard) + .collect(Collectors.toList())); + assertThat(unassignedChildSplits) + .doesNotContainAnyElementsOf( + shardsWithoutParents.stream() + .map(this::getSplitFromShard) + .collect(Collectors.toList())); + } + + @Test + public void testOrderedUnassignedChildSplitsWithUnknownParentsFinishedAvailableForAssignment() { + List finishedParentShards = + Arrays.asList( + generateShard(1, "1000", "3000", null), + generateShard(2, "2000", "4000", null), + generateShard(3, "3000", "5000", null)); + List shardsWithParents = + Arrays.asList( + generateShard(4, "4000", null, generateShardId(1)), + generateShard(5, "5000", null, generateShardId(1)), + generateShard(6, "6000", null, generateShardId(2))); + List shardsWithoutParents = + Arrays.asList( + generateShard(8, "8000", null, null), generateShard(9, "9000", null, null)); + + SplitTracker splitTracker = new SplitTracker(STREAM_ARN, InitialPosition.TRIM_HORIZON); + splitTracker.addSplits( + Stream.of(finishedParentShards, shardsWithParents, shardsWithParents) + .flatMap(Collection::stream) + .collect(Collectors.toList())); + splitTracker.markAsFinished( + finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList())); + + List unassignedChildSplits = + splitTracker.getUnassignedChildSplits( + finishedParentShards.stream() + .map(Shard::shardId) + .collect(Collectors.toSet())); + + assertThat(unassignedChildSplits) + .containsExactlyInAnyOrderElementsOf( + shardsWithParents.stream() + .map(this::getSplitFromShard) + .collect(Collectors.toList())); + assertThat(unassignedChildSplits) + .doesNotContainAnyElementsOf( + shardsWithoutParents.stream() + .map(this::getSplitFromShard) + .collect(Collectors.toList())); + } + @Test public void testOrderedMarkingParentSplitAsFinishedMakesChildrenAvailableForAssignment() { List shards =