From 673a25acc3ac231c5fc1bfe0fcdd3c7e57f2de91 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Wed, 27 Sep 2023 17:51:30 +0200 Subject: [PATCH] KAFKA-10199: Do not unlock state directories of tasks in state updater (#14442) When Streams completes a rebalance, it unlocks state directories all unassigned tasks. Unfortunately, when the state updater is enabled, Streams does not look into the state updater to determine the unassigned tasks. This commit corrects this by adding the check. Reviewer: Lucas Brutschy --- .../processor/internals/TaskManager.java | 3 +- .../processor/internals/TaskManagerTest.java | 42 +++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index e7d8bbb3daee..e117e3bd5ead 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1229,9 +1229,10 @@ private void releaseLockedDirectoriesForTasks(final Set tasksToUnlock) { */ private void releaseLockedUnassignedTaskDirectories() { final Iterator taskIdIterator = lockedTaskDirectories.iterator(); + final Map allTasks = allTasks(); while (taskIdIterator.hasNext()) { final TaskId id = taskIdIterator.next(); - if (!tasks.contains(id)) { + if (!allTasks.containsKey(id)) { stateDirectory.unlock(id); taskIdIterator.remove(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 705bea18d55b..d534077cf2ef 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1684,6 +1684,48 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception verify(stateDirectory); } + @Test + public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() throws Exception { + final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions).build(); + final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId01Partitions).build(); + final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId02Partitions).build(); + final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions) + .inState(State.CREATED) + .withInputPartitions(taskId03Partitions).build(); + final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); + when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask)); + when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask)); + expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03); + expectUnlockFor(taskId03); + makeTaskFolders( + taskId00.toString(), + taskId01.toString(), + taskId02.toString(), + taskId03.toString() + ); + replay(stateDirectory); + + final Set assigned = mkSet(t1p0, t1p1, t1p2); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(mkSet(t1p1, t1p2)); + replay(consumer); + + taskManager.handleRebalanceStart(singleton("topic")); + taskManager.handleRebalanceComplete(); + + verify(consumer); + verify(stateDirectory); + assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02))); + } + @Test public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception { final Map changelogOffsets = mkMap(