From da1ff05b1bf260964429eb9c0dd52cd52202b3ee Mon Sep 17 00:00:00 2001 From: Aravinda Kidambi Srinivasan Date: Thu, 2 May 2024 11:16:38 -0700 Subject: [PATCH] Address review comments Move the check to initializeComplete method and also add some logs to the test --- .../kinesis/lifecycle/ShardConsumer.java | 40 +++++++++++-------- .../kinesis/lifecycle/ShardConsumerTest.java | 34 ++++++++++------ 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index b6e6dc1dd..d12483848 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -174,24 +174,20 @@ public void executeLifecycle() { if (isShutdownRequested()) { stateChangeFuture = shutdownComplete(); } else if (needsInitialization) { - if (stateChangeFuture != null && stateChangeFuture.get()) { - // Task rejection during the subscribe() call will not be propagated back as it not executed - // in the context of the Scheduler thread. Hence we should not assume the subscription will - // always be successful. - // But if subscription was not successful, then it will recover - // during healthCheck which will restart subscription. - // From Shardconsumer point of view, initialization after the below subscribe call - // is complete - subscribe(); - needsInitialization = false; - // Initialization is complete, we don't need to do initializeComplete anymore. - // ShardConsumer is already in ProcessingState and any further activity - // will be driven by publisher pushing data to subscriber which invokes handleInput - // and that triggers ProcessTask. Scheduler is only meant to do health-checks - // to ensure the consumer is not stuck for any reason and to do shutdown handling. - } else { - stateChangeFuture = initializeComplete(); + if (stateChangeFuture != null) { + if (stateChangeFuture.get()) { + // Task rejection during the subscribe() call will not be propagated back as it not executed + // in the context of the Scheduler thread. Hence we should not assume the subscription will + // always be successful. + // But if subscription was not successful, then it will recover + // during healthCheck which will restart subscription. + // From Shardconsumer point of view, initialization after the below subscribe call + // is complete + subscribe(); + needsInitialization = false; + } } + stateChangeFuture = initializeComplete(); } } catch (InterruptedException e) { // @@ -284,6 +280,16 @@ void subscribe() { @VisibleForTesting synchronized CompletableFuture initializeComplete() { + if (!needsInitialization) { + // initialization already complete, this must be a no-op. + // ShardConsumer must be in ProcessingState and + // any further activity will be driven by publisher pushing data to subscriber + // which invokes handleInput and that triggers ProcessTask. + // Scheduler is only meant to do health-checks to ensure the consumer + // is not stuck for any reason and to do shutdown handling. + return CompletableFuture.completedFuture(true); + } + if (taskOutcome != null) { updateState(taskOutcome); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 82acbf5e1..184634b46 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -868,26 +868,31 @@ public void testEmptyShardProcessingRaceCondition() throws Exception { when(mockState.taskType()).thenReturn(TaskType.BLOCK_ON_PARENT_SHARDS); ConsumerTask mockTask = mock(ConsumerTask.class); when(mockState.createTask(any(), any(), any())).thenReturn(mockTask); + // Simulate successful BlockedOnParent task execution + // and successful Initialize task execution when(mockTask.call()).thenReturn(new TaskResult(false)); - // Invoke async processing of blocked on parent task + log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to initiate async" + + " processing of blocked on parent task"); consumer.executeLifecycle(); ArgumentCaptor taskToExecute = ArgumentCaptor.forClass(Runnable.class); verify(mockExecutor, timeout(100)).execute(taskToExecute.capture()); taskToExecute.getValue().run(); + log.info("RecordProcessor Thread: Simulated successful execution of Blocked on parent task"); reset(mockExecutor); - // move to initializing state and - // Invoke async processing of initialize state + log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to InitializingState" + + " and initiate async processing of initialize task"); when(mockState.successTransition()).thenReturn(mockState); when(mockState.state()).thenReturn(ShardConsumerState.INITIALIZING); when(mockState.taskType()).thenReturn(TaskType.INITIALIZE); consumer.executeLifecycle(); verify(mockExecutor, timeout(100)).execute(taskToExecute.capture()); + log.info("RecordProcessor Thread: Simulated successful execution of Initialize task"); taskToExecute.getValue().run(); - // Move to processing state - // and complete initialization future successfully + log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to move to ProcessingState" + + " and mark initialization future as complete"); when(mockState.state()).thenReturn(ShardConsumerState.PROCESSING); consumer.executeLifecycle(); @@ -908,17 +913,18 @@ public void testEmptyShardProcessingRaceCondition() throws Exception { when(mockState.taskType()).thenReturn(TaskType.PROCESS); ConsumerTask mockProcessTask = mock(ConsumerTask.class); when(mockState.createTask(any(), any(), any())).thenReturn(mockProcessTask); - CountDownLatch waitForSubscribeLatch = new CountDownLatch(1); when(mockProcessTask.call()).then(input -> { // first we want to wait for subscribe to be called, // but we cannot control the timing, so wait for 10 seconds // to let the main thread invoke executeLifecyle which // will perform subscribe processTaskLatch.countDown(); - log.info("Waiting for countdown latch"); - waitForSubscribeLatch.await(10, TimeUnit.SECONDS); - log.info("Waiting for countdown latch - DONE"); + log.info("Record Processor Thread: Holding shardConsumer lock, waiting for 10 seconds to" + + " let subscribe be called by scheduler thread"); + Thread.sleep(10 * 1000); + log.info("RecordProcessor Thread: Done waiting"); // then return shard end result + log.info("RecordProcessor Thread: Simulating execution of ProcessTask and returning shard-end result"); return new TaskResult(true); }); Subscription mockSubscription = mock(Subscription.class); @@ -927,10 +933,10 @@ public void testEmptyShardProcessingRaceCondition() throws Exception { processTaskLatch.await(); - // now invoke lifecycle which should invoke subscribe - // but since we cannot countdown the latch, the latch will timeout + // invoke executeLifecycle, which should invoke subscribe // meanwhile if scheduler tries to acquire the ShardConsumer lock it will - // be blocked during initialization processing. Thereby creating the + // be blocked during initialization processing because handleInput was + // already invoked and will be holding the lock. Thereby creating the // race condition we want. reset(mockState); AtomicBoolean successTransitionCalled = new AtomicBoolean(false); @@ -949,13 +955,15 @@ public void testEmptyShardProcessingRaceCondition() throws Exception { } return ShardConsumerState.PROCESSING; }); + log.info("Scheduler Thread: Invoking ShardConsumer.executeLifecycle() to invoke subscribe and" + + " complete initialization"); consumer.executeLifecycle(); // initialization should be done by now, make sure shard consumer did not // perform shutdown processing yet. + log.info("Verifying scheduler did not perform shutdown transition during initialization"); verify(mockState, times(0)).shutdownTransition(any()); } - private void mockSuccessfulShutdown(CyclicBarrier taskCallBarrier) { mockSuccessfulShutdown(taskCallBarrier, null); }