Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition between ShardConsumer shutdown and initialization #1319

Merged
merged 6 commits into from
May 2, 2024

Conversation

akidambisrinivasan
Copy link
Contributor

When Kinesis shards have no data, there can be a race condition where the shard-end record processing from RecordProcessorThread interleaves with Scheduler performing initialization. This leads to ShardConsumer making incorrect state transition during initialization (moves from PROCESSING -> SHUTTING_DOWN) state and during shutdown handling it moves from SHUTTING_DOWN -> SHUTDOWN_COMPLETE without running the ShutdownTask.

This can cause the ShardConsumer to not perform proper shutdown processing that is required for a child shard processing to be unblocked. So the child shard could be blocked forever unless the lease for the parent shard moves to a new worker and that worker does not run into the race condition.

This patch fixes the race condition as follows:

The intializationComplete invocation is not needed after needsInitialization has been set to false. Because initializationComplete is mean to perform initialization in an async manner, but once its done, the async task is a no-op in happy-path, but it can perform incorrect state transition during a race condition.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Issue: #837

When Kinesis shards have no data, there can be a race condition where
the shard-end record processing from RecordProcessorThread
interleaves with Scheduler performing initialization.
This leads to ShardConsumer making incorrect state transition
during initialization (moves from PROCESSING -> SHUTTING_DOWN) state
and during shutdown handling it moves from SHUTTING_DOWN -> SHUTDOWN_COMPLETE
without running the ShutdownTask.

This can cause the ShardConsumer to not perform proper shutdown
processing that is required for a child shard processing
to be unblocked. So the child shard could be blocked forever unless the
lease for the parent shard moves to a new worker and that worker does
not run into the race condition.

This patch fixes the race condition as follows:

The intializationComplete invocation is not needed after
needsInitialization has been set to false. Because initializationComplete
is mean to perform initialization in an async manner, but once
its done, the async task is a no-op in happy-path, but it can
perform incorrect state transition during a race condition.
Copy link
Contributor

@brendan-p-lynch brendan-p-lynch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unit test we can add for this so that we can make sure this change is tested. Otherwise looks good

@akidambisrinivasan
Copy link
Contributor Author

Is there a unit test we can add for this so that we can make sure this change is tested. Otherwise looks good

Im looking into it, its hard to reproduce the race in UT because the ShardConsumer constructs its own subscriber. Wanted to get the fix in first and I am continuing to investigate if UT is possible, will add it in a separate patch, if possible.

Also add a unit test to test the changes
Move the check to initializeComplete method and also add some logs
to the test
Comment on lines +884 to +885
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to InitializingState" +
" and initiate async processing of initialize task");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need logs in unit tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes its a complex unit test, the logs help for someone to follow along.

@@ -848,6 +856,114 @@ public void testLongRunningTasks() throws Exception {
verifyNoMoreInteractions(taskExecutionListener);
}

@Test
public void testEmptyShardProcessingRaceCondition() throws Exception {
RecordsPublisher mockPublisher = mock(RecordsPublisher.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np : make all variable finals ?

abhit17
abhit17 previously approved these changes May 2, 2024
@akidambisrinivasan akidambisrinivasan merged commit 16e8404 into awslabs:master May 2, 2024
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants