diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java index 679e84d6d0e0..382f572351f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PendingUpdateAction.java @@ -25,6 +25,7 @@ public class PendingUpdateAction { enum Action { UPDATE_INPUT_PARTITIONS, + CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS, RECYCLE, SUSPEND, ADD_BACK, @@ -48,6 +49,11 @@ public static PendingUpdateAction createUpdateInputPartition(final Set inputPartitions) { + Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!"); + return new PendingUpdateAction(Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS, inputPartitions); + } + public static PendingUpdateAction createRecycleTask(final Set inputPartitions) { Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!"); return new PendingUpdateAction(Action.RECYCLE, inputPartitions); @@ -66,7 +72,7 @@ public static PendingUpdateAction createCloseClean() { } public Set getInputPartitions() { - if (action != Action.UPDATE_INPUT_PARTITIONS && action != Action.RECYCLE) { + if (action != Action.UPDATE_INPUT_PARTITIONS && action != Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS && action != Action.RECYCLE) { throw new IllegalStateException("Action type " + action + " does not have a set of input partitions!"); } return inputPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 888991b0a949..f3603b2a3160 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -543,8 +543,12 @@ private void handleTasksInStateUpdater(final Map> ac if (activeTasksToCreate.containsKey(taskId)) { final Set inputPartitions = activeTasksToCreate.get(taskId); if (task.isActive() && !task.inputPartitions().equals(inputPartitions)) { - stateUpdater.remove(taskId); - tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); + if (tasks.removePendingTaskToCloseClean(taskId)) { + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, inputPartitions); + } else { + tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions); + stateUpdater.remove(taskId); + } } else if (task.isActive()) { tasks.removePendingActiveTaskToSuspend(taskId); if (tasks.removePendingTaskToCloseClean(taskId)) { @@ -819,15 +823,17 @@ private void recycleTaskFromStateUpdater(final Task task, } } - private void closeTaskClean(final Task task, - final Set tasksToCloseDirty, - final Map taskExceptions) { + /** Returns true if the task closed clean */ + private boolean closeTaskClean(final Task task, + final Set tasksToCloseDirty, + final Map taskExceptions) { try { task.suspend(); task.closeClean(); if (task.isActive()) { activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id()); } + return true; } catch (final RuntimeException e) { final String uncleanMessage = String.format("Failed to close task %s cleanly. " + "Attempting to close remaining tasks before re-throwing:", task.id()); @@ -838,6 +844,7 @@ private void closeTaskClean(final Task task, } taskExceptions.putIfAbsent(task.id(), e); + return false; } } @@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() { stateUpdater.add(task); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); + } else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { + if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { + task.revive(); + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + addTaskToStateUpdater(task); + } } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); stateUpdater.add(task); @@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final long now, closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToAddBack(task.id())) { stateUpdater.add(task); + } else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) { + if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) { + task.revive(); + task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); + addTaskToStateUpdater(task); + } } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); transitRestoredTaskToRunning(task, now, offsetResetter); @@ -1306,7 +1325,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist // before suspending and closing the topology task.prepareCommit(); } catch (final RuntimeException swallow) { - log.error("Error flushing caches of dirty task {} ", task.id(), swallow); + log.error("Error flushing caches of dirty task {}", task.id(), swallow); } try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java index f87713ee5324..baeaeb70c5c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java @@ -120,6 +120,19 @@ public boolean hasPendingTasksToRecycle() { return pendingUpdateActions.values().stream().anyMatch(action -> action.getAction() == Action.RECYCLE); } + @Override + public Set removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId) { + if (containsTaskIdWithAction(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS)) { + return pendingUpdateActions.remove(taskId).getInputPartitions(); + } + return null; + } + + @Override + public void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set inputPartitions) { + pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions)); + } + @Override public Set removePendingTaskToUpdateInputPartitions(final TaskId taskId) { if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java index 18064797d3a9..ecf8816e4cb6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java @@ -41,6 +41,10 @@ public interface TasksRegistry { void addPendingTaskToRecycle(final TaskId taskId, final Set inputPartitions); + Set removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId); + + void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set inputPartitions); + Set removePendingTaskToUpdateInputPartitions(final TaskId taskId); void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set inputPartitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 410ad213d375..21f5f9561d9d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -107,6 +107,7 @@ import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask; +import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -529,6 +530,29 @@ public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() { Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } + @Test + public void shouldCloseReviveAndUpdateInputPartitionOfActiveTaskInStateUpdater() { + final StreamTask activeTaskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId03Partitions).build(); + final Set newInputPartitions = taskId02Partitions; + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(activeTaskToUpdateInputPartitions)); + when(tasks.removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id())).thenReturn(true); + + taskManager.handleAssignment( + mkMap(mkEntry(activeTaskToUpdateInputPartitions.id(), newInputPartitions)), + Collections.emptyMap() + ); + + Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + Mockito.verify(stateUpdater, never()).remove(activeTaskToUpdateInputPartitions.id()); + Mockito.verify(tasks).removePendingTaskToCloseClean(activeTaskToUpdateInputPartitions.id()); + Mockito.verify(tasks).addPendingTaskToCloseReviveAndUpdateInputPartitions(activeTaskToUpdateInputPartitions.id(), newInputPartitions); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); + } + @Test public void shouldKeepReassignedActiveTaskInStateUpdater() { final StreamTask reassignedActiveTask = statefulTask(taskId03, taskId03ChangelogPartitions) @@ -629,6 +653,29 @@ public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } + @Test + public void shouldNeverCloseReviveAndUpdateInputPartitionsOfStandbyTaskInStateUpdater() { + final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); + final Set newInputPartitions = taskId03Partitions; + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToUpdateInputPartitions)); + + taskManager.handleAssignment( + Collections.emptyMap(), + mkMap(mkEntry(standbyTaskToUpdateInputPartitions.id(), newInputPartitions)) + ); + + Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); + Mockito.verify(stateUpdater, never()).remove(standbyTaskToUpdateInputPartitions.id()); + Mockito.verify(tasks, never()).removePendingTaskToCloseClean(standbyTaskToUpdateInputPartitions.id()); + Mockito.verify(tasks, never()) + .addPendingTaskToCloseReviveAndUpdateInputPartitions(standbyTaskToUpdateInputPartitions.id(), newInputPartitions); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); + } + @Test public void shouldKeepReassignedStandbyTaskInStateUpdater() { final StandbyTask reassignedStandbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) @@ -1101,30 +1148,53 @@ public void shouldCloseTasksRemovedFromStateUpdater() { @Test public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { - final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions) .withInputPartitions(taskId00Partitions) .inState(State.RESTORING).build(); - final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions) .withInputPartitions(taskId01Partitions) .inState(State.RUNNING).build(); when(stateUpdater.hasRemovedTasks()).thenReturn(true); - when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); + when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(activeTask, standbyTask)); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); - when(tasks.removePendingTaskToUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions); - when(tasks.removePendingTaskToUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(any())).thenReturn(null); + when(tasks.removePendingTaskToUpdateInputPartitions(activeTask.id())).thenReturn(taskId02Partitions); + when(tasks.removePendingTaskToUpdateInputPartitions(standbyTask.id())).thenReturn(taskId03Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - Mockito.verify(task00).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap()); - Mockito.verify(task00, never()).closeDirty(); - Mockito.verify(task00, never()).closeClean(); - Mockito.verify(stateUpdater).add(task00); - Mockito.verify(task01).updateInputPartitions(Mockito.eq(taskId03Partitions), anyMap()); - Mockito.verify(task01, never()).closeDirty(); - Mockito.verify(task01, never()).closeClean(); - Mockito.verify(stateUpdater).add(task01); + Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap()); + Mockito.verify(activeTask, never()).closeDirty(); + Mockito.verify(activeTask, never()).closeClean(); + Mockito.verify(stateUpdater).add(activeTask); + Mockito.verify(standbyTask).updateInputPartitions(Mockito.eq(taskId03Partitions), anyMap()); + Mockito.verify(standbyTask, never()).closeDirty(); + Mockito.verify(standbyTask, never()).closeClean(); + Mockito.verify(stateUpdater).add(standbyTask); + } + + @Test + public void shouldCloseReviveAndUpdateInputPartitionsOfTasksRemovedFromStateUpdater() { + final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RESTORING).build(); + when(stateUpdater.hasRemovedTasks()).thenReturn(true); + when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(activeTask)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.removePendingTaskToRecycle(any())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(activeTask.id())).thenReturn(taskId02Partitions); + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); + + taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + + Mockito.verify(activeTask).closeClean(); + Mockito.verify(activeTask).revive(); + Mockito.verify(activeTask).updateInputPartitions(Mockito.eq(taskId02Partitions), anyMap()); + Mockito.verify(activeTask).initializeIfNeeded(); + Mockito.verify(activeTask, never()).closeDirty(); + Mockito.verify(stateUpdater).add(activeTask); } @Test @@ -1135,6 +1205,7 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true); when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask)); @@ -1164,9 +1235,12 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); + final StreamTask taskToCloseReviveAndUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId04Partitions).build(); when(stateUpdater.hasRemovedTasks()).thenReturn(true); when(stateUpdater.drainRemovedTasks()) - .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); + .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions, taskToCloseReviveAndUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer)) .thenReturn(convertedTask1); @@ -1184,6 +1258,10 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id()))) ).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId04Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(taskToCloseReviveAndUpdateInputPartitions.id())).thenReturn(taskId05Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions( + argThat(taskId -> !taskId.equals(taskToCloseReviveAndUpdateInputPartitions.id())) + )).thenReturn(null); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); taskManager.setMainConsumer(consumer); replay(consumer); @@ -1191,7 +1269,7 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { }); verify(consumer); - Mockito.verify(activeTaskCreator, times(2)).closeAndRemoveTaskProducerIfNeeded(any()); + Mockito.verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any()); Mockito.verify(convertedTask0).initializeIfNeeded(); Mockito.verify(convertedTask1).initializeIfNeeded(); Mockito.verify(stateUpdater).add(convertedTask0); @@ -1199,6 +1277,11 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { Mockito.verify(taskToClose).closeClean(); Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId04Partitions), anyMap()); Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean(); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive(); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap()); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded(); + Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions); } @Test @@ -1368,6 +1451,7 @@ private TaskManager setUpTransitionToRunningOfRestoredTask(final StreamTask stat final TasksRegistry tasks) { when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask)); @@ -1545,6 +1629,7 @@ public void shouldUpdateInputPartitionsOfRestoredTask() { .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(taskId01Partitions); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -1561,6 +1646,27 @@ public void shouldUpdateInputPartitionsOfRestoredTask() { Mockito.verify(tasks).addTask(statefulTask); } + @Test + public void shouldCloseReviveAndUpdateInputPartitionsOfRestoredTask() { + final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(taskId01Partitions); + when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask)); + when(stateUpdater.restoresActiveTasks()).thenReturn(true); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + + taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + + Mockito.verify(statefulTask).updateInputPartitions(Mockito.eq(taskId01Partitions), anyMap()); + Mockito.verify(statefulTask).closeClean(); + Mockito.verify(statefulTask).revive(); + Mockito.verify(statefulTask).initializeIfNeeded(); + Mockito.verify(stateUpdater).add(statefulTask); + } + @Test public void shouldSuspendRestoredTaskIfRevoked() { final StreamTask statefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) @@ -1569,6 +1675,7 @@ public void shouldSuspendRestoredTaskIfRevoked() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.removePendingTaskToRecycle(statefulTask.id())).thenReturn(null); when(tasks.removePendingTaskToUpdateInputPartitions(statefulTask.id())).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(statefulTask.id())).thenReturn(null); when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet(statefulTask)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -1602,6 +1709,9 @@ public void shouldHandleMultipleRestoredTasks() { final StreamTask taskToUpdateInputPartitions = statefulTask(taskId04, taskId04ChangelogPartitions) .inState(State.RESTORING) .withInputPartitions(taskId04Partitions).build(); + final StreamTask taskToCloseReviveAndUpdateInputPartitions = statelessTask(taskId05) + .inState(State.RESTORING) + .withInputPartitions(taskId05Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle, taskToRecycle.inputPartitions())) @@ -1622,13 +1732,18 @@ public void shouldHandleMultipleRestoredTasks() { when(tasks.removePendingTaskToUpdateInputPartitions( argThat(taskId -> !taskId.equals(taskToUpdateInputPartitions.id()))) ).thenReturn(null); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(taskToCloseReviveAndUpdateInputPartitions.id())).thenReturn(taskId04Partitions); + when(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions( + argThat(taskId -> !taskId.equals(taskToCloseReviveAndUpdateInputPartitions.id()))) + ).thenReturn(null); when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(mkSet( taskToTransitToRunning, taskToRecycle, taskToCloseClean, taskToAddBack, - taskToUpdateInputPartitions + taskToUpdateInputPartitions, + taskToCloseReviveAndUpdateInputPartitions )); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1639,6 +1754,10 @@ public void shouldHandleMultipleRestoredTasks() { Mockito.verify(taskToCloseClean).closeClean(); Mockito.verify(stateUpdater).add(taskToAddBack); Mockito.verify(taskToUpdateInputPartitions).updateInputPartitions(Mockito.eq(taskId05Partitions), anyMap()); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).closeClean(); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).revive(); + Mockito.verify(taskToCloseReviveAndUpdateInputPartitions).initializeIfNeeded(); + Mockito.verify(stateUpdater).add(taskToCloseReviveAndUpdateInputPartitions); } @Test @@ -1726,11 +1845,11 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { @Test public void shouldRethrowTaskCorruptedExceptionFromInitialization() { final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) - .inState(State.CREATED) - .withInputPartitions(taskId00Partitions).build(); + .inState(State.CREATED) + .withInputPartitions(taskId00Partitions).build(); final StreamTask statefulTask1 = statefulTask(taskId01, taskId01ChangelogPartitions) - .inState(State.CREATED) - .withInputPartitions(taskId01Partitions).build(); + .inState(State.CREATED) + .withInputPartitions(taskId01Partitions).build(); final StreamTask statefulTask2 = statefulTask(taskId02, taskId02ChangelogPartitions) .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); @@ -1741,8 +1860,8 @@ public void shouldRethrowTaskCorruptedExceptionFromInitialization() { doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded(); final TaskCorruptedException thrown = assertThrows( - TaskCorruptedException.class, - () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter) + TaskCorruptedException.class, + () -> taskManager.checkStateUpdater(time.milliseconds(), noOpResetter) ); Mockito.verify(tasks).addTask(statefulTask0); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java index ab955d956f04..f18c4299b462 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java @@ -154,6 +154,7 @@ public void shouldVerifyIfPendingTaskToRecycleExist() { tasks.addPendingTaskToCloseClean(TASK_0_1); tasks.addPendingTaskToAddBack(TASK_0_2); tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); tasks.addPendingActiveTaskToSuspend(TASK_1_2); assertTrue(tasks.hasPendingTasksToRecycle()); @@ -179,6 +180,7 @@ public void shouldVerifyIfPendingTaskToInitExist() { tasks.addPendingTaskToCloseClean(TASK_0_1); tasks.addPendingTaskToAddBack(TASK_0_2); tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_1_1, mkSet(TOPIC_PARTITION_B_0)); tasks.addPendingActiveTaskToSuspend(TASK_1_2); assertTrue(tasks.hasPendingTasksToInit()); @@ -198,6 +200,18 @@ public void shouldAddAndRemovePendingTaskToUpdateInputPartitions() { assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); } + @Test + public void shouldAddAndRemovePendingTaskToCloseReviveAndUpdateInputPartitions() { + final Set expectedInputPartitions = mkSet(TOPIC_PARTITION_A_0); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); + + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, expectedInputPartitions); + final Set actualInputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0); + + assertEquals(expectedInputPartitions, actualInputPartitions); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); + } + @Test public void shouldAddAndRemovePendingTaskToCloseClean() { assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); @@ -237,6 +251,19 @@ public void onlyRemovePendingTaskToRecycleShouldRemoveTaskFromPendingUpdateActio assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); assertNotNull(tasks.removePendingTaskToRecycle(TASK_0_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); + } + + @Test + public void onlyRemovePendingTaskToCloseReviveAndUpdateInputPartitionsShouldRemoveTaskFromPendingUpdateActions() { + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + + assertFalse(tasks.removePendingTaskToAddBack(TASK_0_0)); + assertFalse(tasks.removePendingTaskToCloseClean(TASK_0_0)); + assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); + assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); + assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); + assertNotNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); } @Test @@ -248,6 +275,7 @@ public void onlyRemovePendingTaskToUpdateInputPartitionsShouldRemoveTaskFromPend assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNotNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); } @Test @@ -259,6 +287,7 @@ public void onlyRemovePendingTaskToCloseCleanShouldRemoveTaskFromPendingUpdateAc assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); assertTrue(tasks.removePendingTaskToCloseClean(TASK_0_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); } @Test @@ -270,6 +299,7 @@ public void onlyRemovePendingTaskToAddBackShouldRemoveTaskFromPendingUpdateActio assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); assertTrue(tasks.removePendingTaskToAddBack(TASK_0_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); } @Test @@ -281,6 +311,7 @@ public void onlyRemovePendingTaskToSuspendShouldRemoveTaskFromPendingUpdateActio assertNull(tasks.removePendingTaskToRecycle(TASK_0_0)); assertNull(tasks.removePendingTaskToUpdateInputPartitions(TASK_0_0)); assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); } @Test @@ -306,8 +337,13 @@ public void shouldOnlyKeepLastUpdateAction() { assertTrue(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); tasks.addPendingActiveTaskToSuspend(TASK_0_0); - tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); assertFalse(tasks.removePendingActiveTaskToSuspend(TASK_0_0)); + assertNotNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); + + tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)); + assertNull(tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(TASK_0_0)); assertNotNull(tasks.removePendingTaskToRecycle(TASK_0_0)); } } \ No newline at end of file