Skip to content

Commit

Permalink
KAFKA-16077: Streams with state updater fails to close task upon fenc…
Browse files Browse the repository at this point in the history
…ing (apache#15117)

* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated

There is a race condition in the state updater that can cause the following:

 1. We have an active task in the state updater
 2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
 3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
 4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
 5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized

This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.

To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.

We should not remove the task twice, otherwise, we'll end up in this situation

1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException

Reviewers: Bruno Cadonna <[email protected]>
  • Loading branch information
lucasbru authored Jan 5, 2024
1 parent 599e22b commit c0b6493
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class PendingUpdateAction {

enum Action {
UPDATE_INPUT_PARTITIONS,
CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS,
RECYCLE,
SUSPEND,
ADD_BACK,
Expand All @@ -48,6 +49,11 @@ public static PendingUpdateAction createUpdateInputPartition(final Set<TopicPart
return new PendingUpdateAction(Action.UPDATE_INPUT_PARTITIONS, inputPartitions);
}

public static PendingUpdateAction createCloseReviveAndUpdateInputPartition(final Set<TopicPartition> inputPartitions) {
Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!");
return new PendingUpdateAction(Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS, inputPartitions);
}

public static PendingUpdateAction createRecycleTask(final Set<TopicPartition> inputPartitions) {
Objects.requireNonNull(inputPartitions, "Set of input partitions to update is null!");
return new PendingUpdateAction(Action.RECYCLE, inputPartitions);
Expand All @@ -66,7 +72,7 @@ public static PendingUpdateAction createCloseClean() {
}

public Set<TopicPartition> getInputPartitions() {
if (action != Action.UPDATE_INPUT_PARTITIONS && action != Action.RECYCLE) {
if (action != Action.UPDATE_INPUT_PARTITIONS && action != Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS && action != Action.RECYCLE) {
throw new IllegalStateException("Action type " + action + " does not have a set of input partitions!");
}
return inputPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,12 @@ private void handleTasksInStateUpdater(final Map<TaskId, Set<TopicPartition>> ac
if (activeTasksToCreate.containsKey(taskId)) {
final Set<TopicPartition> inputPartitions = activeTasksToCreate.get(taskId);
if (task.isActive() && !task.inputPartitions().equals(inputPartitions)) {
stateUpdater.remove(taskId);
tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions);
if (tasks.removePendingTaskToCloseClean(taskId)) {
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, inputPartitions);
} else {
tasks.addPendingTaskToUpdateInputPartitions(taskId, inputPartitions);
stateUpdater.remove(taskId);
}
} else if (task.isActive()) {
tasks.removePendingActiveTaskToSuspend(taskId);
if (tasks.removePendingTaskToCloseClean(taskId)) {
Expand Down Expand Up @@ -819,15 +823,17 @@ private void recycleTaskFromStateUpdater(final Task task,
}
}

private void closeTaskClean(final Task task,
final Set<Task> tasksToCloseDirty,
final Map<TaskId, RuntimeException> taskExceptions) {
/** Returns true if the task closed clean */
private boolean closeTaskClean(final Task task,
final Set<Task> tasksToCloseDirty,
final Map<TaskId, RuntimeException> taskExceptions) {
try {
task.suspend();
task.closeClean();
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
return true;
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
"Attempting to close remaining tasks before re-throwing:", task.id());
Expand All @@ -838,6 +844,7 @@ private void closeTaskClean(final Task task,
}

taskExceptions.putIfAbsent(task.id(), e);
return false;
}
}

Expand Down Expand Up @@ -916,6 +923,12 @@ private void handleRemovedTasksFromStateUpdater() {
stateUpdater.add(task);
} else if (tasks.removePendingTaskToCloseClean(task.id())) {
closeTaskClean(task, tasksToCloseDirty, taskExceptions);
} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) {
if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
task.revive();
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
addTaskToStateUpdater(task);
}
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
stateUpdater.add(task);
Expand Down Expand Up @@ -950,6 +963,12 @@ private void handleRestoredTasksFromStateUpdater(final long now,
closeTaskClean(task, tasksToCloseDirty, taskExceptions);
} else if (tasks.removePendingTaskToAddBack(task.id())) {
stateUpdater.add(task);
} else if ((inputPartitions = tasks.removePendingTaskToCloseReviveAndUpdateInputPartitions(task.id())) != null) {
if (closeTaskClean(task, tasksToCloseDirty, taskExceptions)) {
task.revive();
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
addTaskToStateUpdater(task);
}
} else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id()));
transitRestoredTaskToRunning(task, now, offsetResetter);
Expand Down Expand Up @@ -1306,7 +1325,7 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
// before suspending and closing the topology
task.prepareCommit();
} catch (final RuntimeException swallow) {
log.error("Error flushing caches of dirty task {} ", task.id(), swallow);
log.error("Error flushing caches of dirty task {}", task.id(), swallow);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ public boolean hasPendingTasksToRecycle() {
return pendingUpdateActions.values().stream().anyMatch(action -> action.getAction() == Action.RECYCLE);
}

@Override
public Set<TopicPartition> removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.CLOSE_REVIVE_AND_UPDATE_INPUT_PARTITIONS)) {
return pendingUpdateActions.remove(taskId).getInputPartitions();
}
return null;
}

@Override
public void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions) {
pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseReviveAndUpdateInputPartition(inputPartitions));
}

@Override
public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public interface TasksRegistry {

void addPendingTaskToRecycle(final TaskId taskId, final Set<TopicPartition> inputPartitions);

Set<TopicPartition> removePendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId);

void addPendingTaskToCloseReviveAndUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions);

Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId taskId);

void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set<TopicPartition> inputPartitions);
Expand Down
Loading

0 comments on commit c0b6493

Please sign in to comment.