Skip to content

Commit

Permalink
[FLINK-36270][Connectors/DynamoDB] Assign only children splits of a s…
Browse files Browse the repository at this point in the history
…plit when it is marked as finished
  • Loading branch information
gguptp committed Sep 13, 2024
1 parent 5d2deb1 commit c890f62
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
}

/** When we mark a split as finished, we will only assign its child splits to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
splitAssignment
Expand All @@ -137,7 +138,7 @@ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinis
splitsFinishedEvent
.getFinishedSplitIds()
.contains(split.splitId()));
assignSplits();
assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
}

private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) {
Expand All @@ -161,7 +162,7 @@ private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwabl
context.registeredReaders().size());
return;
}
assignSplits();
assignAllAvailableSplits();
}

/**
Expand Down Expand Up @@ -200,9 +201,21 @@ private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies(
return splitGraphInconsistencyTracker;
}

private void assignSplits() {
private void assignAllAvailableSplits() {
List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
splitTracker.splitsAvailableForAssignment();
assignSplits(splitsAvailableForAssignment);
}

private void assignChildSplits(Set<String> finishedSplitIds) {
List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment =
splitTracker.getUnassignedChildSplits(finishedSplitIds);
assignSplits(splitsAvailableForAssignment);
}

private void assignSplits(List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment) {
Map<Integer, List<DynamoDbStreamsShardSplit>> newSplitAssignments = new HashMap<>();
for (DynamoDbStreamsShardSplit split : splitTracker.splitsAvailableForAssignment()) {
for (DynamoDbStreamsShardSplit split : splitsAvailableForAssignment) {
assignSplitToSubtask(split, newSplitAssignments);
}
updateSplitAssignment(newSplitAssignments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
import org.apache.flink.connector.dynamodb.source.util.ShardUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.Shard;

import java.util.Collection;
Expand All @@ -50,10 +52,12 @@
@Internal
public class SplitTracker {
private final Map<String, DynamoDbStreamsShardSplit> knownSplits = new ConcurrentHashMap<>();
private final Map<String, Set<String>> parentChildSplitMap = new ConcurrentHashMap<>();
private final Set<String> assignedSplits = new HashSet<>();
private final Set<String> finishedSplits = new HashSet<>();
private final String streamArn;
private final InitialPosition initialPosition;
private static final Logger LOG = LoggerFactory.getLogger(SplitTracker.class);

public SplitTracker(String streamArn, InitialPosition initialPosition) {
this(Collections.emptyList(), streamArn, initialPosition);
Expand All @@ -69,7 +73,7 @@ public SplitTracker(
splitWithStatus -> {
DynamoDbStreamsShardSplit currentSplit = splitWithStatus.split();
knownSplits.put(currentSplit.splitId(), currentSplit);

addSplitToMapping(currentSplit);
if (ASSIGNED.equals(splitWithStatus.assignmentStatus())) {
assignedSplits.add(splitWithStatus.split().splitId());
}
Expand All @@ -93,10 +97,20 @@ public void addSplits(Collection<Shard> shardsToAdd) {
DynamoDbStreamsShardSplit newSplit =
mapToSplit(shard, getStartingPosition(shard, discoveredShardIds));
knownSplits.put(shardId, newSplit);
addSplitToMapping(newSplit);
}
}
}

private void addSplitToMapping(DynamoDbStreamsShardSplit split) {
if (split.getParentShardId() == null) {
return;
}
parentChildSplitMap
.computeIfAbsent(split.getParentShardId(), k -> new HashSet<>())
.add(split.splitId());
}

/**
* If there is no parent in knownSplits of the current shard, that means that the parent has
* been read already, so we start the current shard with the specified initial position. We do
Expand Down Expand Up @@ -159,24 +173,63 @@ public boolean isAssigned(String splitId) {
return assignedSplits.contains(splitId);
}

/**
* Function to get children splits available for given parent ids. This will ensure not to
* iterate all the values in knownSplits so saving compute
*/
public List<DynamoDbStreamsShardSplit> getUnassignedChildSplits(Set<String> parentSplitIds) {
return parentSplitIds
.parallelStream()
.filter(
splitId -> {
if (!parentChildSplitMap.containsKey(splitId)) {
LOG.warn(
"splitId: {} is not present in parent-child relationship map. "
+ "This indicates that there might be some data loss in the application",
splitId);
}
return parentChildSplitMap.containsKey(splitId);
})
.map(parentChildSplitMap::get)
.flatMap(Set::stream)
.filter(knownSplits::containsKey)
.map(knownSplits::get)
.filter(this::checkIfSplitCanBeAssigned)
.collect(Collectors.toList());
}

/**
* Tells whether a split can be assigned or not. Conditions which it checks:
*
* <p>- Split should not already be assigned.
*
* <p>- Split should not be already finished.
*
* <p>- The parent splits should either be finished or no longer be present in knownSplits.
*/
private boolean checkIfSplitCanBeAssigned(DynamoDbStreamsShardSplit split) {
boolean splitIsNotAssigned = !isAssigned(split.splitId());
return splitIsNotAssigned
&& !isFinished(split.splitId())
&& verifyParentIsEitherFinishedOrCleanedUp(split);
}

/**
* Since we never put an inconsistent shard lineage to splitTracker, so if a shard's parent is
* not there, that means that that should already be cleaned up.
*/
public List<DynamoDbStreamsShardSplit> splitsAvailableForAssignment() {
return knownSplits.values().stream()
.filter(
split -> {
boolean splitIsNotAssigned = !isAssigned(split.splitId());
return splitIsNotAssigned
&& !isFinished(split.splitId())
&& verifyParentIsEitherFinishedOrCleanedUp(split);
})
return knownSplits
.values()
.parallelStream()
.filter(this::checkIfSplitCanBeAssigned)
.collect(Collectors.toList());
}

public List<DynamoDBStreamsShardSplitWithAssignmentStatus> snapshotState(long checkpointId) {
return knownSplits.values().stream()
return knownSplits
.values()
.parallelStream()
.map(
split -> {
SplitAssignmentStatus assignmentStatus =
Expand Down Expand Up @@ -209,6 +262,7 @@ public void cleanUpOldFinishedSplits(Set<String> discoveredSplitIds) {
if (isSplitReadyToBeCleanedUp(finishedSplit, discoveredSplitIds)) {
finishedSplits.remove(finishedSplitId);
knownSplits.remove(finishedSplitId);
parentChildSplitMap.remove(finishedSplit.splitId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,91 @@ public void testOrderedAllUnassignedSplitsWithoutParentsAvailableForAssignment()
assertThat(pendingSplits).containsExactlyInAnyOrderElementsOf(splitsWithoutParents);
}

@Test
public void testOrderedUnassignedChildSplitsParentsFinishedAvailableForAssignment() {
List<Shard> finishedParentShards =
Arrays.asList(
generateShard(1, "1000", "3000", null),
generateShard(2, "2000", "4000", null),
generateShard(3, "3000", "5000", null));
List<Shard> shardsWithParents =
Arrays.asList(
generateShard(4, "4000", null, generateShardId(1)),
generateShard(5, "5000", null, generateShardId(1)),
generateShard(6, "6000", null, generateShardId(2)),
generateShard(7, "7000", null, generateShardId(3)));
List<Shard> shardsWithoutParents =
Arrays.asList(
generateShard(8, "8000", null, null), generateShard(9, "9000", null, null));

SplitTracker splitTracker = new SplitTracker(STREAM_ARN, InitialPosition.TRIM_HORIZON);
splitTracker.addSplits(
Stream.of(finishedParentShards, shardsWithParents, shardsWithParents)
.flatMap(Collection::stream)
.collect(Collectors.toList()));
splitTracker.markAsFinished(
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));

List<DynamoDbStreamsShardSplit> unassignedChildSplits =
splitTracker.getUnassignedChildSplits(
finishedParentShards.stream()
.map(Shard::shardId)
.collect(Collectors.toSet()));

assertThat(unassignedChildSplits)
.containsExactlyInAnyOrderElementsOf(
shardsWithParents.stream()
.map(this::getSplitFromShard)
.collect(Collectors.toList()));
assertThat(unassignedChildSplits)
.doesNotContainAnyElementsOf(
shardsWithoutParents.stream()
.map(this::getSplitFromShard)
.collect(Collectors.toList()));
}

@Test
public void testOrderedUnassignedChildSplitsWithUnknownParentsFinishedAvailableForAssignment() {
List<Shard> finishedParentShards =
Arrays.asList(
generateShard(1, "1000", "3000", null),
generateShard(2, "2000", "4000", null),
generateShard(3, "3000", "5000", null));
List<Shard> shardsWithParents =
Arrays.asList(
generateShard(4, "4000", null, generateShardId(1)),
generateShard(5, "5000", null, generateShardId(1)),
generateShard(6, "6000", null, generateShardId(2)));
List<Shard> shardsWithoutParents =
Arrays.asList(
generateShard(8, "8000", null, null), generateShard(9, "9000", null, null));

SplitTracker splitTracker = new SplitTracker(STREAM_ARN, InitialPosition.TRIM_HORIZON);
splitTracker.addSplits(
Stream.of(finishedParentShards, shardsWithParents, shardsWithParents)
.flatMap(Collection::stream)
.collect(Collectors.toList()));
splitTracker.markAsFinished(
finishedParentShards.stream().map(Shard::shardId).collect(Collectors.toList()));

List<DynamoDbStreamsShardSplit> unassignedChildSplits =
splitTracker.getUnassignedChildSplits(
finishedParentShards.stream()
.map(Shard::shardId)
.collect(Collectors.toSet()));

assertThat(unassignedChildSplits)
.containsExactlyInAnyOrderElementsOf(
shardsWithParents.stream()
.map(this::getSplitFromShard)
.collect(Collectors.toList()));
assertThat(unassignedChildSplits)
.doesNotContainAnyElementsOf(
shardsWithoutParents.stream()
.map(this::getSplitFromShard)
.collect(Collectors.toList()));
}

@Test
public void testOrderedMarkingParentSplitAsFinishedMakesChildrenAvailableForAssignment() {
List<Shard> shards =
Expand Down

0 comments on commit c890f62

Please sign in to comment.