Skip to content

Commit

Permalink
KAFKA-16097: Add suspended tasks back to the state updater when reass…
Browse files Browse the repository at this point in the history
…igned (apache#15163)

When a partition is revoked, the corresponding task gets a pending action
"SUSPEND". This pending action may overwrite a previous pending action.

If the task was previously removed from the state updater, e.g. because
we were fenced, the pending action is overwritten with suspend, and in
handleAssigned, upon reassignment of that task, then SUSPEND action is
removed.

Then, once the state updater executes the removal, no pending action
is registered anymore, and we run into an IllegalStateException.

This commit solves the problem by adding back reassigned tasks to the
state updater, since they may have been removed from the state updater
for another reason than being restored completely.

Reviewers: Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Jan 10, 2024
1 parent 3d1d060 commit 0349f23
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,11 @@ public Action getAction() {
return action;
}


@Override
public String toString() {
return "PendingUpdateAction{" +
"inputPartitions=" + inputPartitions +
", action=" + action +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,15 @@ private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> ac
stateUpdater.remove(taskId);
}
} else if (task.isActive()) {
tasks.removePendingActiveTaskToSuspend(taskId);
if (tasks.removePendingActiveTaskToSuspend(taskId)) {
log.info(
"We were planning on suspending a task {} because it was revoked " +
"The task got reassigned to this thread, so we cancel suspending " +
"of the task, but add it back to the state updater, since we do not know " +
"if it is fully restored yet.",
taskId);
tasks.addPendingTaskToAddBack(taskId);
}
if (tasks.removePendingTaskToCloseClean(taskId)) {
log.info(
"We were planning on closing task {} because we lost one of its partitions." +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,12 @@ public void addPendingStandbyTasksToCreate(final Map<TaskId, Set<TopicPartition>

@Override
public Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.RECYCLE)) {
return pendingUpdateActions.remove(taskId).getInputPartitions();
}
return null;
return removePendingUpdateActionWithInputPartitions(taskId, Action.RECYCLE);
}

@Override
public void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
updatePendingUpdateAction(taskId, PendingUpdateAction.createRecycleTask(inputPartitions));
}

@Override
Expand All @@ -122,70 +119,79 @@ public boolean hasPendingTasksToRecycle() {

@Override
public Set<TopicPartition> removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS)) {
return pendingUpdateActions.remove(taskId).getInputPartitions();
}
return null;
return removePendingUpdateActionWithInputPartitions(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS);
}

@Override
public void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions));
updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions));
}

@Override
public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
return pendingUpdateActions.remove(taskId).getInputPartitions();
}
return null;
return removePendingUpdateActionWithInputPartitions(taskId, Action.UPDATE_INPUT_PARTITIONS);
}

@Override
public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions));
updatePendingUpdateAction(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions));
}

@Override
public boolean removePendingTaskToAddBack(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.ADD_BACK)) {
pendingUpdateActions.remove(taskId);
return true;
}
return false;
return removePendingUpdateAction(taskId, Action.ADD_BACK);
}

@Override
public void addPendingTaskToAddBack(final TaskId taskId) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createAddBack());
updatePendingUpdateAction(taskId, PendingUpdateAction.createAddBack());
}

@Override
public boolean removePendingTaskToCloseClean(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) {
pendingUpdateActions.remove(taskId);
return true;
}
return false;
return removePendingUpdateAction(taskId, Action.CLOSE_CLEAN);
}

@Override
public void addPendingTaskToCloseClean(final TaskId taskId) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean());
updatePendingUpdateAction(taskId, PendingUpdateAction.createCloseClean());
}

@Override
public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.SUSPEND)) {
pendingUpdateActions.remove(taskId);
return removePendingUpdateAction(taskId, Action.SUSPEND);
}

@Override
public void addPendingActiveTaskToSuspend(final TaskId taskId) {
updatePendingUpdateAction(taskId, PendingUpdateAction.createSuspend());
}

private Set<TopicPartition> removePendingUpdateActionWithInputPartitions(final TaskId taskId, final Action action) {
if (containsTaskIdWithAction(taskId, action)) {
final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.remove(taskId);
log.info("Removing pending update action {} for task {}", taskId, pendingUpdateAction);
return pendingUpdateAction.getInputPartitions();
}
return null;
}

private boolean removePendingUpdateAction(final TaskId taskId, final Action action) {
if (containsTaskIdWithAction(taskId, action)) {
log.info("Removing pending update action {} for task {}", taskId, pendingUpdateActions.remove(taskId));
return true;
}
return false;
}

@Override
public void addPendingActiveTaskToSuspend(final TaskId taskId) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend());
private void updatePendingUpdateAction(final TaskId taskId, final PendingUpdateAction newAction) {
if (pendingUpdateActions.containsKey(taskId)) {
log.info("Adding pending update action {} for task {}, previous action was {}",
newAction, taskId, pendingUpdateActions.get(taskId));
} else {
log.info("Adding pending update action {} for task {}, no previous action", newAction, taskId);
}
pendingUpdateActions.put(taskId, newAction);
}

private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,27 @@ public void shouldRemoveReassignedLostTaskInStateUpdaterFromPendingTaskToCloseCl
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}

@Test
public void shouldRemoveReassignedTaskInStateUpdaterFromPendingSuspend() {
final StreamTask reassignedTask = statefulTask(taskId03, taskId03ChangelogPartitions)
.inState(State.RESTORING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.getTasks()).thenReturn(mkSet(reassignedTask));
when(tasks.removePendingActiveTaskToSuspend(reassignedTask.id())).thenReturn(true);

taskManager.handleAssignment(
mkMap(mkEntry(reassignedTask.id(), reassignedTask.inputPartitions())),
Collections.emptyMap()
);

Mockito.verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
Mockito.verify(tasks).removePendingActiveTaskToSuspend(reassignedTask.id());
Mockito.verify(tasks).addPendingTaskToAddBack(reassignedTask.id());
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}

@Test
public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
final StandbyTask standbyTaskToUpdateInputPartitions = standbyTask(taskId02, taskId02ChangelogPartitions)
Expand Down

0 comments on commit 0349f23

Please sign in to comment.