From dcf03d1a3696ba6321c32a9573e1fdba2c78f548 Mon Sep 17 00:00:00 2001 From: gguptp Date: Tue, 17 Sep 2024 13:12:09 +0530 Subject: [PATCH] [FLINK-36296][Connectors/DynamoDB] Add support for incremental shard discovery in DDB Streams source --- .../DynamodbStreamsSourceConfigConstants.java | 6 + .../DynamoDbStreamsSourceEnumerator.java | 112 ++++++++++-------- .../SplitGraphInconsistencyTracker.java | 11 ++ .../enumerator/tracker/SplitTracker.java | 22 +++- .../DynamoDbStreamsSourceEnumeratorTest.java | 107 +++++++++++++++-- .../SplitGraphInconsistencyTrackerTest.java | 16 ++- .../util/DynamoDbStreamsProxyProvider.java | 14 ++- 7 files changed, 224 insertions(+), 64 deletions(-) diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java index 290a9758..064c5bf7 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java @@ -45,6 +45,12 @@ public enum InitialPosition { .defaultValue(Duration.ofSeconds(60)) .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption INCREMENTAL_SHARD_DISCOVERY_INTERVAL = + ConfigOptions.key("flink.shard.incremental.discovery.intervalmillis") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription("The interval between each attempt to discover new shards."); + public static final ConfigOption DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT = ConfigOptions.key("flink.describestream.inconsistencyresolution.retries") .intType() diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java index 8a8f29ca..7b2c8b8e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java @@ -53,6 +53,7 @@ import java.util.stream.Collectors; import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT; +import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.INCREMENTAL_SHARD_DISCOVERY_INTERVAL; import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.SHARD_DISCOVERY_INTERVAL; import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION; @@ -104,6 +105,13 @@ public DynamoDbStreamsSourceEnumerator( public void start() { context.callAsync(this::discoverSplits, this::processDiscoveredSplits); final long shardDiscoveryInterval = sourceConfig.get(SHARD_DISCOVERY_INTERVAL).toMillis(); + final long incrementalShardDiscoveryInterval = + sourceConfig.get(INCREMENTAL_SHARD_DISCOVERY_INTERVAL).toMillis(); + context.callAsync( + this::incrementallyDiscoverSplits, + this::processDiscoveredSplits, + 0, + incrementalShardDiscoveryInterval); context.callAsync( this::discoverSplits, this::processDiscoveredSplits, @@ -165,42 +173,6 @@ private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwabl assignAllAvailableSplits(); } - /** - * This method tracks the discovered splits in a graph and if the graph has inconsistencies, it - * tries to resolve them using DescribeStream calls using the first inconsistent node found in - * the split graph. - * - * @param discoveredSplits splits discovered after calling DescribeStream at the start of the - * application or periodically. - */ - private SplitGraphInconsistencyTracker trackSplitsAndResolveInconsistencies( - ListShardsResult discoveredSplits) { - SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = - new SplitGraphInconsistencyTracker(); - splitGraphInconsistencyTracker.addNodes(discoveredSplits.getShards()); - - // we don't want to do inconsistency checks for DISABLED streams because there will be no - // open child shard in DISABLED stream - boolean streamDisabled = discoveredSplits.getStreamStatus().equals(StreamStatus.DISABLED); - int describeStreamInconsistencyResolutionCount = - sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); - for (int i = 0; - i < describeStreamInconsistencyResolutionCount - && !streamDisabled - && splitGraphInconsistencyTracker.inconsistencyDetected(); - i++) { - String earliestClosedLeafNodeId = - splitGraphInconsistencyTracker.getEarliestClosedLeafNode(); - LOG.warn( - "We have detected inconsistency with DescribeStream output, resolving inconsistency with shardId: {}", - earliestClosedLeafNodeId); - ListShardsResult shardsToResolveInconsistencies = - streamProxy.listShards(streamArn, earliestClosedLeafNodeId); - splitGraphInconsistencyTracker.addNodes(shardsToResolveInconsistencies.getShards()); - } - return splitGraphInconsistencyTracker; - } - private void assignAllAvailableSplits() { List splitsAvailableForAssignment = splitTracker.splitsAvailableForAssignment(); @@ -239,35 +211,75 @@ public void close() throws IOException { streamProxy.close(); } + private ListShardsResult incrementallyDiscoverSplits() { + List shardList = splitTracker.getKnownShards(); + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + new SplitGraphInconsistencyTracker(); + splitGraphInconsistencyTracker.addNodes(shardList); + ListShardsResult listShardsResult = + streamProxy.listShards( + streamArn, splitGraphInconsistencyTracker.getLatestLeafNode()); + return trackSplitsAndResolveInconsistencies( + splitGraphInconsistencyTracker, listShardsResult); + } + /** - * This method is used to discover DynamoDb Streams splits the job can subscribe to. It can be - * run in parallel, is important to not mutate any shared state. + * This method tracks the discovered splits in a graph and if the graph has inconsistencies, it + * tries to resolve them using DescribeStream calls using the first inconsistent node found in + * the split graph. * - * @return list of discovered splits + * @param listShardsResult shards discovered after calling DescribeStream at the start of the + * application or periodically via either incremental discovery or full discovery. */ - private ListShardsResult discoverSplits() { - ListShardsResult listShardsResult = streamProxy.listShards(streamArn, null); - SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = - trackSplitsAndResolveInconsistencies(listShardsResult); - + private ListShardsResult trackSplitsAndResolveInconsistencies( + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker, + ListShardsResult listShardsResult) { + splitGraphInconsistencyTracker.addNodes(listShardsResult.getShards()); + boolean streamDisabled = listShardsResult.getStreamStatus().equals(StreamStatus.DISABLED); + int describeStreamInconsistencyResolutionCount = + sourceConfig.get(DESCRIBE_STREAM_INCONSISTENCY_RESOLUTION_RETRY_COUNT); + for (int i = 0; + i < describeStreamInconsistencyResolutionCount + && !streamDisabled + && splitGraphInconsistencyTracker.inconsistencyDetected(); + i++) { + String earliestClosedLeafNodeId = + splitGraphInconsistencyTracker.getEarliestClosedLeafNode(); + LOG.warn( + "We have detected inconsistency with DescribeStream output, resolving inconsistency with shardId: {}", + earliestClosedLeafNodeId); + ListShardsResult shardsToResolveInconsistencies = + streamProxy.listShards(streamArn, earliestClosedLeafNodeId); + splitGraphInconsistencyTracker.addNodes(shardsToResolveInconsistencies.getShards()); + } ListShardsResult discoveredSplits = new ListShardsResult(); discoveredSplits.setStreamStatus(listShardsResult.getStreamStatus()); discoveredSplits.setInconsistencyDetected(listShardsResult.getInconsistencyDetected()); - List shardList = new ArrayList<>(splitGraphInconsistencyTracker.getNodes()); - // We do not throw an exception here and just return to let SplitTracker process through the - // splits it has not yet processed. This might be helpful for large streams which see a lot - // of - // inconsistency issues. if (splitGraphInconsistencyTracker.inconsistencyDetected()) { LOG.error( "There are inconsistencies in DescribeStream which we were not able to resolve. First leaf node on which inconsistency was detected:" + splitGraphInconsistencyTracker.getEarliestClosedLeafNode()); return discoveredSplits; } - discoveredSplits.addShards(shardList); + discoveredSplits.addShards(new ArrayList<>(splitGraphInconsistencyTracker.getNodes())); return discoveredSplits; } + /** + * This method is used to discover DynamoDb Streams splits the job can subscribe to. It can be + * run in parallel, is important to not mutate any shared state. + * + * @return list of discovered splits + */ + private ListShardsResult discoverSplits() { + ListShardsResult listShardsResult = streamProxy.listShards(streamArn, null); + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + new SplitGraphInconsistencyTracker(); + splitGraphInconsistencyTracker.addNodes(listShardsResult.getShards()); + return trackSplitsAndResolveInconsistencies( + splitGraphInconsistencyTracker, listShardsResult); + } + private void assignSplitToSubtask( DynamoDbStreamsShardSplit split, Map> newSplitAssignments) { diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java index 68306fee..fb6eac21 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTracker.java @@ -40,6 +40,7 @@ */ public class SplitGraphInconsistencyTracker { private final TreeSet closedLeafNodes; + private final TreeSet leafNodes; private final Map nodes; private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceEnumerator.class); @@ -47,6 +48,7 @@ public class SplitGraphInconsistencyTracker { public SplitGraphInconsistencyTracker() { nodes = new HashMap<>(); closedLeafNodes = new TreeSet<>(); + leafNodes = new TreeSet<>(); } /** @@ -68,11 +70,13 @@ public void addNodes(List shards) { private void removeParentFromClosedLeafNodes(Shard shard) { if (shard.parentShardId() != null) { closedLeafNodes.remove(shard.parentShardId()); + leafNodes.remove(shard.parentShardId()); } } private void addNode(Shard shard) { nodes.put(shard.shardId(), shard); + leafNodes.add(shard.shardId()); if (shard.sequenceNumberRange().endingSequenceNumber() != null) { closedLeafNodes.add(shard.shardId()); } @@ -101,6 +105,13 @@ public String getEarliestClosedLeafNode() { return closedLeafNodes.first(); } + public String getLatestLeafNode() { + if (leafNodes.isEmpty()) { + return null; + } + return leafNodes.last(); + } + public Collection getNodes() { return nodes.values(); } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java index 73885ddd..6ce0ce92 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; import software.amazon.awssdk.services.dynamodb.model.Shard; import java.util.Collection; @@ -185,7 +186,8 @@ public List getUnassignedChildSplits(Set pare if (!parentChildSplitMap.containsKey(splitId)) { LOG.warn( "splitId: {} is not present in parent-child relationship map. " - + "This indicates that there might be some data loss in the application", + + "This indicates that either there might be some data loss in the " + + "application or the child shard hasn't been discovered yet", splitId); } return parentChildSplitMap.containsKey(splitId); @@ -250,6 +252,24 @@ public Set getKnownSplitIds() { return knownSplits.keySet(); } + public List getKnownShards() { + return knownSplits + .values() + .parallelStream() + .filter( + split -> + !ShardUtils.isShardOlderThanInconsistencyDetectionRetentionPeriod( + split.splitId())) + .map( + split -> + Shard.builder() + .shardId(split.splitId()) + .parentShardId(split.getParentShardId()) + .sequenceNumberRange(SequenceNumberRange.builder().build()) + .build()) + .collect(Collectors.toList()); + } + /** * finishedSplits needs to be cleaned up. The logic applied to cleaning up finished splits is * that if any split has been finished reading, its parent has been finished reading and it is diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java index fe6ea38e..23427e2e 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java @@ -40,6 +40,8 @@ import software.amazon.awssdk.services.dynamodb.model.Shard; import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -87,7 +89,7 @@ void testStartWithoutStateDiscoversAndAssignsShards( enumerator.start(); // Then initial discovery scheduled, with periodic discovery after assertThat(context.getOneTimeCallables()).hasSize(1); - assertThat(context.getPeriodicCallables()).hasSize(1); + assertThat(context.getPeriodicCallables()).hasSize(2); // Given there is one registered reader, with 4 shards in stream final int subtaskId = 1; @@ -128,7 +130,7 @@ void testStartWithoutStateDiscoversAndAssignsShards( // Given no resharding occurs (list of shards remains the same) // When first periodic discovery runs - context.runPeriodicCallable(0); + context.runPeriodicCallable(1); // Then no additional splits are assigned SplitsAssignment noUpdateSplitAssignment = context.getSplitsAssignmentSequence().get(1); @@ -173,7 +175,7 @@ void testStartWithStateDoesNotAssignCompletedShards( state); // When enumerator starts enumerator.start(); - assertThat(context.getPeriodicCallables()).hasSize(1); + assertThat(context.getPeriodicCallables()).hasSize(2); // Given there is one registered reader, with 4 shards in stream final int subtaskId = 1; @@ -188,7 +190,7 @@ void testStartWithStateDoesNotAssignCompletedShards( }; streamProxy.addShards(shards); // When first periodic discovery of shards - context.runPeriodicCallable(0); + context.runPeriodicCallable(1); // Then newer shards will be discovered and read from TRIM_HORIZON, independent of // configured starting position SplitsAssignment firstUpdateSplitAssignment = @@ -212,6 +214,95 @@ void testStartWithStateDoesNotAssignCompletedShards( } } + @ParameterizedTest + @MethodSource("provideInitialPositions") + void testStartWithStateDoesNotAssignCompletedShardsForIncrementalDiscovery( + DynamodbStreamsSourceConfigConstants.InitialPosition initialPosition, + ShardIteratorType shardIteratorType) + throws Throwable { + try (MockSplitEnumeratorContext context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS)) { + DynamoDbStreamsProxyProvider.TestDynamoDbStreamsProxy streamProxy = + getTestStreamProxy(); + final Shard completedShard = + generateShard(Instant.now().toEpochMilli(), "1000", "2000", null); + + DynamoDbStreamsSourceEnumeratorState state = + new DynamoDbStreamsSourceEnumeratorState( + Collections.singletonList( + new DynamoDBStreamsShardSplitWithAssignmentStatus( + new DynamoDbStreamsShardSplit( + STREAM_ARN, + completedShard.shardId(), + StartingPosition.fromStart(), + completedShard.parentShardId()), + SplitAssignmentStatus.FINISHED))); + + final Configuration sourceConfig = new Configuration(); + sourceConfig.set(STREAM_INITIAL_POSITION, initialPosition); + + // Given enumerator is initialised with state + DynamoDbStreamsSourceEnumerator enumerator = + new DynamoDbStreamsSourceEnumerator( + context, + STREAM_ARN, + sourceConfig, + streamProxy, + ShardAssignerFactory.uniformShardAssigner(), + state); + // When enumerator starts + enumerator.start(); + assertThat(context.getPeriodicCallables()).hasSize(2); + + // Given there is one registered reader, with 4 shards in stream + final int subtaskId = 1; + context.registerReader(TestUtil.getTestReaderInfo(subtaskId)); + enumerator.addReader(subtaskId); + Shard[] shards = + new Shard[] { + completedShard, + generateShard( + Instant.now().plus(Duration.ofHours(1)).toEpochMilli(), + "2100", + null, + completedShard.shardId()), + generateShard( + Instant.now().plus(Duration.ofHours(2)).toEpochMilli(), + "2200", + null, + null), + generateShard( + Instant.now().plus(Duration.ofHours(3)).toEpochMilli(), + "3000", + null, + null) + }; + streamProxy.addShards(shards); + // When first periodic discovery of shards + context.runPeriodicCallable(0); + // Then newer shards will be discovered and read from TRIM_HORIZON, independent of + // configured starting position + SplitsAssignment firstUpdateSplitAssignment = + context.getSplitsAssignmentSequence().get(0); + assertThat(firstUpdateSplitAssignment.assignment()).containsOnlyKeys(subtaskId); + assertThat( + firstUpdateSplitAssignment.assignment().get(subtaskId).stream() + .map(DynamoDbStreamsShardSplit::getShardId)) + .containsExactlyInAnyOrder( + shards[1].shardId(), shards[2].shardId(), shards[3].shardId()); + assertThat( + firstUpdateSplitAssignment.assignment().get(subtaskId).stream() + .map(DynamoDbStreamsShardSplit::getStartingPosition) + .map(StartingPosition::getShardIteratorType)) + .containsExactlyInAnyOrder( + TRIM_HORIZON, // child shard of completed shard should be read from + // TRIM_HORIZON + shardIteratorType, // other shards should be read from the configured + // position + shardIteratorType); + } + } + @Test void testAddSplitsBackThrowsException() throws Throwable { try (MockSplitEnumeratorContext context = @@ -328,7 +419,7 @@ void testAssignSplitsHandlesRepeatSplitsGracefully() throws Throwable { // When first periodic discovery runs // Then handled gracefully - assertThatNoException().isThrownBy(() -> context.runPeriodicCallable(0)); + assertThatNoException().isThrownBy(() -> context.runPeriodicCallable(1)); SplitsAssignment secondReturnedSplitAssignment = context.getSplitsAssignmentSequence().get(1); assertThat(secondReturnedSplitAssignment.assignment()).isEmpty(); @@ -381,7 +472,7 @@ void testAssignSplitWithoutRegisteredReaders() throws Throwable { enumerator.addReader(subtaskId); // When next periodic discovery is run - context.runPeriodicCallable(0); + context.runPeriodicCallable(1); SplitsAssignment initialSplitAssignment = context.getSplitsAssignmentSequence().get(0); @@ -450,7 +541,7 @@ void testAssignSplitWithInsufficientRegisteredReaders() throws Throwable { enumerator.addReader(secondSubtaskId); // When next periodic discovery is run - context.runPeriodicCallable(0); + context.runPeriodicCallable(1); SplitsAssignment initialSplitAssignment = context.getSplitsAssignmentSequence().get(0); @@ -630,7 +721,7 @@ private DynamoDbStreamsSourceEnumerator getSimpleEnumeratorWithNoState( null); enumerator.start(); assertThat(context.getOneTimeCallables()).hasSize(1); - assertThat(context.getPeriodicCallables()).hasSize(1); + assertThat(context.getPeriodicCallables()).hasSize(2); return enumerator; } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTrackerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTrackerTest.java index 8127485b..54582948 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTrackerTest.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitGraphInconsistencyTrackerTest.java @@ -36,7 +36,7 @@ */ public class SplitGraphInconsistencyTrackerTest { @Test - public void testShardGraphTrackerHappyCase() { + public void testSplitGraphInconsistencyTrackerHappyCase() { List shards = Arrays.asList( // shards which don't have a parent @@ -51,6 +51,16 @@ public void testShardGraphTrackerHappyCase() { assertThat(splitGraphInconsistencyTracker.inconsistencyDetected()).isFalse(); assertThat(splitGraphInconsistencyTracker.getNodes()) .containsExactlyInAnyOrderElementsOf(shards); + assertThat(splitGraphInconsistencyTracker.getLatestLeafNode()) + .isEqualTo(generateShardId(3)); + } + + @Test + public void testSplitGraphInconsistencyTrackerWhenThereIsNoShardTracked() { + SplitGraphInconsistencyTracker splitGraphInconsistencyTracker = + new SplitGraphInconsistencyTracker(); + assertThat(splitGraphInconsistencyTracker.inconsistencyDetected()).isFalse(); + assertThat(splitGraphInconsistencyTracker.getLatestLeafNode()).isEqualTo(null); } @Test @@ -73,6 +83,8 @@ public void testShardGraphTrackerHappyCase() { .isEqualTo(generateShardId(secondShardIdTimestamp)); assertThat(splitGraphInconsistencyTracker.getNodes()) .containsExactlyInAnyOrderElementsOf(shards); + assertThat(splitGraphInconsistencyTracker.getLatestLeafNode()) + .isEqualTo(generateShardId(secondShardIdTimestamp)); } @Test @@ -92,5 +104,7 @@ public void testSplitGraphInconsistencyTrackerDetectsInconsistenciesAndDeletesCl assertThat(splitGraphInconsistencyTracker.inconsistencyDetected()).isFalse(); assertThat(splitGraphInconsistencyTracker.getNodes()) .containsExactlyInAnyOrder(shards.get(0), shards.get(2)); + assertThat(splitGraphInconsistencyTracker.getLatestLeafNode()) + .isEqualTo(generateShardId(secondShardIdTimestamp)); } } diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java index c9f45c19..6231036d 100644 --- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java +++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java @@ -58,7 +58,6 @@ public static class TestDynamoDbStreamsProxy implements StreamProxy { // List shards configuration private final List shards = new ArrayList<>(); private Supplier listShardsExceptionSupplier; - private String lastProvidedLastSeenShardId; // GetRecords configuration private Supplier getRecordsExceptionSupplier; @@ -68,8 +67,6 @@ public static class TestDynamoDbStreamsProxy implements StreamProxy { @Override public ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId) { - this.lastProvidedLastSeenShardId = lastSeenShardId; - if (listShardsExceptionSupplier != null) { try { throw listShardsExceptionSupplier.get(); @@ -79,7 +76,16 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh } ListShardsResult listShardsResult = new ListShardsResult(); - List results = new ArrayList<>(shards); + List results = + shards.stream() + .filter( + shard -> { + if (lastSeenShardId == null) { + return true; + } + return shard.shardId().compareTo(lastSeenShardId) > 0; + }) + .collect(Collectors.toList()); listShardsResult.addShards(results); return listShardsResult; }