Skip to content

Commit

Permalink
KAFKA-18484 [2/2]; Handle exceptions during coordinator unload (apach…
Browse files Browse the repository at this point in the history
…e#18667)

Ensure that unloading a coordinator always succeeds. Previously, we have
guarded against exceptions from DeferredEvent completions. All that
remains is handling exceptions from the onUnloaded() method of the
coordinator state machine.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
squah-confluent authored Jan 23, 2025
1 parent 8000d04 commit 5946f27
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,11 @@ private void unload() {
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
coordinator.onUnloaded();
try {
coordinator.onUnloaded();
} catch (Throwable ex) {
log.error("Failed to unload coordinator for {} due to {}.", tp, ex.getMessage(), ex);
}
}
coordinator = null;
}
Expand Down Expand Up @@ -2415,9 +2419,19 @@ public void scheduleUnloadOperation(
try {
if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
context.transitionTo(CoordinatorState.CLOSED);
coordinators.remove(tp, context);
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
try {
context.transitionTo(CoordinatorState.CLOSED);
log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch);
} catch (Throwable ex) {
// It's very unlikely that we will ever see an exception here, since we
// already make an effort to catch exceptions in the unload method.
log.error("Failed to unload metadata for {} with epoch {} due to {}.",
tp, partitionEpoch, ex.toString());
} finally {
// Always remove the coordinator context, otherwise the coordinator
// shard could be permanently stuck.
coordinators.remove(tp, context);
}
} else {
log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.",
tp, partitionEpoch, context.epoch);
Expand Down Expand Up @@ -2498,6 +2512,8 @@ public void close() throws Exception {
context.lock.lock();
try {
context.transitionTo(CoordinatorState.CLOSED);
} catch (Throwable ex) {
log.warn("Failed to unload metadata for {} due to {}.", tp, ex.getMessage(), ex);
} finally {
context.lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,58 @@ public void testScheduleUnloadingWithStalePartitionEpoch() {
assertEquals(10, ctx.epoch);
}

@Test
public void testScheduleUnloadingWithException() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class);

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(metrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
.build();

doThrow(new KafkaException("error")).when(coordinator).onUnloaded();
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.withExecutor(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);

// Loads the coordinator. It directly transitions to active.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(ACTIVE, ctx.state);
assertEquals(10, ctx.epoch);

// Schedule the unloading.
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));
assertEquals(CLOSED, ctx.state);

// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();

// Getting the coordinator context fails because it no longer exists.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}

@Test
public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
Expand Down

0 comments on commit 5946f27

Please sign in to comment.