Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Move the check to initializeComplete method and also add some logs
to the test
  • Loading branch information
akidambisrinivasan committed May 2, 2024
1 parent 940f93b commit da1ff05
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,24 +174,20 @@ public void executeLifecycle() {
if (isShutdownRequested()) {
stateChangeFuture = shutdownComplete();
} else if (needsInitialization) {
if (stateChangeFuture != null && stateChangeFuture.get()) {
// Task rejection during the subscribe() call will not be propagated back as it not executed
// in the context of the Scheduler thread. Hence we should not assume the subscription will
// always be successful.
// But if subscription was not successful, then it will recover
// during healthCheck which will restart subscription.
// From Shardconsumer point of view, initialization after the below subscribe call
// is complete
subscribe();
needsInitialization = false;
// Initialization is complete, we don't need to do initializeComplete anymore.
// ShardConsumer is already in ProcessingState and any further activity
// will be driven by publisher pushing data to subscriber which invokes handleInput
// and that triggers ProcessTask. Scheduler is only meant to do health-checks
// to ensure the consumer is not stuck for any reason and to do shutdown handling.
} else {
stateChangeFuture = initializeComplete();
if (stateChangeFuture != null) {
if (stateChangeFuture.get()) {
// Task rejection during the subscribe() call will not be propagated back as it not executed
// in the context of the Scheduler thread. Hence we should not assume the subscription will
// always be successful.
// But if subscription was not successful, then it will recover
// during healthCheck which will restart subscription.
// From Shardconsumer point of view, initialization after the below subscribe call
// is complete
subscribe();
needsInitialization = false;
}
}
stateChangeFuture = initializeComplete();
}
} catch (InterruptedException e) {
//
Expand Down Expand Up @@ -284,6 +280,16 @@ void subscribe() {

@VisibleForTesting
synchronized CompletableFuture<Boolean> initializeComplete() {
if (!needsInitialization) {
// initialization already complete, this must be a no-op.
// ShardConsumer must be in ProcessingState and
// any further activity will be driven by publisher pushing data to subscriber
// which invokes handleInput and that triggers ProcessTask.
// Scheduler is only meant to do health-checks to ensure the consumer
// is not stuck for any reason and to do shutdown handling.
return CompletableFuture.completedFuture(true);
}

if (taskOutcome != null) {
updateState(taskOutcome);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -868,26 +868,31 @@ public void testEmptyShardProcessingRaceCondition() throws Exception {
when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS);
ConsumerTask mockTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockTask);
// Simulate successful BlockedOnParent task execution
// and successful Initialize task execution
when(mockTask.call()).thenReturn(new TaskResult(false));

// Invoke async processing of blocked on parent task
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to initiate async" +
" processing of blocked on parent task");
consumer.executeLifecycle();
ArgumentCaptor<Runnable> taskToExecute = ArgumentCaptor.forClass(Runnable.class);
verify(mockExecutor, timeout(100)).execute(taskToExecute.capture());
taskToExecute.getValue().run();
log.info("RecordProcessor Thread: Simulated successful execution of Blocked on parent task");
reset(mockExecutor);

// move to initializing state and
// Invoke async processing of initialize state
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to InitializingState" +
" and initiate async processing of initialize task");
when(mockState.successTransition()).thenReturn(mockState);
when(mockState.state()).thenReturn(ShardConsumerState.INITIALIZING);
when(mockState.taskType()).thenReturn(TaskType.INITIALIZE);
consumer.executeLifecycle();
verify(mockExecutor, timeout(100)).execute(taskToExecute.capture());
log.info("RecordProcessor Thread: Simulated successful execution of Initialize task");
taskToExecute.getValue().run();

// Move to processing state
// and complete initialization future successfully
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to ProcessingState" +
" and mark initialization future as complete");
when(mockState.state()).thenReturn(ShardConsumerState.PROCESSING);
consumer.executeLifecycle();

Expand All @@ -908,17 +913,18 @@ public void testEmptyShardProcessingRaceCondition() throws Exception {
when(mockState.taskType()).thenReturn(TaskType.PROCESS);
ConsumerTask mockProcessTask = mock(ConsumerTask.class);
when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask);
CountDownLatch waitForSubscribeLatch = new CountDownLatch(1);
when(mockProcessTask.call()).then(input -> {
// first we want to wait for subscribe to be called,
// but we cannot control the timing, so wait for 10 seconds
// to let the main thread invoke executeLifecyle which
// will perform subscribe
processTaskLatch.countDown();
log.info("Waiting for countdown latch");
waitForSubscribeLatch.await(10, TimeUnit.SECONDS);
log.info("Waiting for countdown latch - DONE");
log.info("Record Processor Thread: Holding shardConsumer lock, waiting for 10 seconds to" +
" let subscribe be called by scheduler thread");
Thread.sleep(10 * 1000);
log.info("RecordProcessor Thread: Done waiting");
// then return shard end result
log.info("RecordProcessor Thread: Simulating execution of ProcessTask and returning shard-end result");
return new TaskResult(true);
});
Subscription mockSubscription = mock(Subscription.class);
Expand All @@ -927,10 +933,10 @@ public void testEmptyShardProcessingRaceCondition() throws Exception {

processTaskLatch.await();

// now invoke lifecycle which should invoke subscribe
// but since we cannot countdown the latch, the latch will timeout
// invoke executeLifecycle, which should invoke subscribe
// meanwhile if scheduler tries to acquire the ShardConsumer lock it will
// be blocked during initialization processing. Thereby creating the
// be blocked during initialization processing because handleInput was
// already invoked and will be holding the lock. Thereby creating the
// race condition we want.
reset(mockState);
AtomicBoolean successTransitionCalled = new AtomicBoolean(false);
Expand All @@ -949,13 +955,15 @@ public void testEmptyShardProcessingRaceCondition() throws Exception {
}
return ShardConsumerState.PROCESSING;
});
log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to invoke subscribe and" +
" complete initialization");
consumer.executeLifecycle();
// initialization should be done by now, make sure shard consumer did not
// perform shutdown processing yet.
log.info("Verifying scheduler did not perform shutdown transition during initialization");
verify(mockState, times(0)).shutdownTransition(any());
}


private void mockSuccessfulShutdown(CyclicBarrier taskCallBarrier) {
mockSuccessfulShutdown(taskCallBarrier, null);
}
Expand Down

0 comments on commit da1ff05

Please sign in to comment.