Skip to content

Commit

Permalink
Ignore dynamically added streams with mismatching region, rather than…
Browse files Browse the repository at this point in the history
… propagating exception
  • Loading branch information
furq-aws committed May 3, 2024
1 parent 16e8404 commit b16d99b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class Scheduler implements Runnable {
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private static final String IGNORED_STREAMS_COUNT = "IgnoredStreams.Count";

private final SchedulerLog slog = new SchedulerLog();

Expand Down Expand Up @@ -507,8 +508,14 @@ Set<StreamIdentifier> checkAndSyncStreamShardsAndLeases()
log.info("Found new stream to process: {}. Syncing shards of that stream.", streamConfig);
ShardSyncTaskManager shardSyncTaskManager = createOrGetShardSyncTaskManager(streamConfig);
shardSyncTaskManager.submitShardSyncTask();
currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier);
try {
currentStreamConfigMap.put(streamIdentifier, streamConfig);
streamsSynced.add(streamIdentifier);
} catch (Exception e) {
log.error("Failed to add stream {} to application. This stream will not be processed.",
streamConfig.streamIdentifier(), e);
MetricsUtil.addCount(metricsScope, IGNORED_STREAMS_COUNT, 1, MetricsLevel.DETAILED);
}
} else {
log.debug("{} is already being processed - skipping shard sync.", streamIdentifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1154,7 +1154,7 @@ public void testOrphanStreamConfigIsPopulatedWithArn() {
}

@Test
public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
public void testMismatchingArnRegionAndKinesisClientRegionOnSchedulerConstructionThrowsException() {
final Region streamArnRegion = Region.US_WEST_1;
Assert.assertNotEquals(streamArnRegion, kinesisClient.serviceClientConfiguration().region());

Expand All @@ -1169,6 +1169,35 @@ public void testMismatchingArnRegionAndKinesisClientRegionThrowsException() {
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
}

@Test
public void testDynamicallyAddedStreamsWithRegionMismatchingKinesisClientRegionAreIgnored() throws Exception {
final Region mismatchingStreamRegion = Region.US_WEST_1;
final Region kinesisClientRegion = kinesisClient.serviceClientConfiguration().region();
Assert.assertNotEquals(mismatchingStreamRegion, kinesisClientRegion);

final StreamIdentifier streamWithMismatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(mismatchingStreamRegion, TEST_ACCOUNT, "stream-1")), TEST_EPOCH);

final StreamIdentifier streamWithMatchingRegion = StreamIdentifier.multiStreamInstance(
Arn.fromString(constructStreamArnStr(kinesisClientRegion, TEST_ACCOUNT, "stream-2")), TEST_EPOCH);

when(multiStreamTracker.streamConfigList()).thenReturn(
Collections.emptyList(), // returned on scheduler construction
Arrays.asList( // returned on stream sync
new StreamConfig(streamWithMismatchingRegion, TEST_INITIAL_POSITION),
new StreamConfig(streamWithMatchingRegion, TEST_INITIAL_POSITION)));
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig,
leaseManagementConfig, lifecycleConfig, metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);

final Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
final Set<StreamIdentifier> currentStreamConfigMapKeys = scheduler.currentStreamConfigMap().keySet();
assertFalse(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMismatchingRegion));
assertTrue(Sets.union(currentStreamConfigMapKeys, syncedStreams).contains(streamWithMatchingRegion));
}

private static String constructStreamIdentifierSer(long accountId, String streamName) {
return String.join(":", String.valueOf(accountId), streamName, String.valueOf(TEST_EPOCH));
}
Expand Down

0 comments on commit b16d99b

Please sign in to comment.