Skip to content

Commit

Permalink
KAFKA-10199: Do not unlock state directories of tasks in state updater (
Browse files Browse the repository at this point in the history
apache#14442)

When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.

Reviewer: Lucas Brutschy <[email protected]>
  • Loading branch information
cadonna authored Sep 27, 2023
1 parent 86450bf commit 673a25a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1229,9 +1229,10 @@ private void releaseLockedDirectoriesForTasks(final Set<TaskId> tasksToUnlock) {
*/
private void releaseLockedUnassignedTaskDirectories() {
final Iterator<TaskId> taskIdIterator = lockedTaskDirectories.iterator();
final Map<TaskId, Task> allTasks = allTasks();
while (taskIdIterator.hasNext()) {
final TaskId id = taskIdIterator.next();
if (!tasks.contains(id)) {
if (!allTasks.containsKey(id)) {
stateDirectory.unlock(id);
taskIdIterator.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1684,6 +1684,48 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception
verify(stateDirectory);
}

@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() throws Exception {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId01Partitions).build();
final StandbyTask standbyTask = standbyTask(taskId02, taskId02ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId02Partitions).build();
final StandbyTask unassignedStandbyTask = standbyTask(taskId03, taskId03ChangelogPartitions)
.inState(State.CREATED)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTask, restoringStatefulTask));
when(tasks.allTasks()).thenReturn(mkSet(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectUnlockFor(taskId03);
makeTaskFolders(
taskId00.toString(),
taskId01.toString(),
taskId02.toString(),
taskId03.toString()
);
replay(stateDirectory);

final Set<TopicPartition> assigned = mkSet(t1p0, t1p1, t1p2);
expect(consumer.assignment()).andReturn(assigned);
consumer.pause(mkSet(t1p1, t1p2));
replay(consumer);

taskManager.handleRebalanceStart(singleton("topic"));
taskManager.handleRebalanceComplete();

verify(consumer);
verify(stateDirectory);
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
}

@Test
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
final Map<TopicPartition, Long> changelogOffsets = mkMap(
Expand Down

0 comments on commit 673a25a

Please sign in to comment.