From 5946f27ac5bc1f4a5bc162ccc26f130933ab2182 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 23 Jan 2025 16:15:21 +0000 Subject: [PATCH] KAFKA-18484 [2/2]; Handle exceptions during coordinator unload (#18667) Ensure that unloading a coordinator always succeeds. Previously, we have guarded against exceptions from DeferredEvent completions. All that remains is handling exceptions from the onUnloaded() method of the coordinator state machine. Reviewers: David Jacot --- .../common/runtime/CoordinatorRuntime.java | 24 +++++++-- .../runtime/CoordinatorRuntimeTest.java | 52 +++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index b341c0adaeb07..1e9724a57aa8e 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -742,7 +742,11 @@ private void unload() { deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); failCurrentBatch(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { - coordinator.onUnloaded(); + try { + coordinator.onUnloaded(); + } catch (Throwable ex) { + log.error("Failed to unload coordinator for {} due to {}.", tp, ex.getMessage(), ex); + } } coordinator = null; } @@ -2415,9 +2419,19 @@ public void scheduleUnloadOperation( try { if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) { log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch); - context.transitionTo(CoordinatorState.CLOSED); - coordinators.remove(tp, context); - log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch); + try { + context.transitionTo(CoordinatorState.CLOSED); + log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch); + } catch (Throwable ex) { + // It's very unlikely that we will ever see an exception here, since we + // already make an effort to catch exceptions in the unload method. + log.error("Failed to unload metadata for {} with epoch {} due to {}.", + tp, partitionEpoch, ex.toString()); + } finally { + // Always remove the coordinator context, otherwise the coordinator + // shard could be permanently stuck. + coordinators.remove(tp, context); + } } else { log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", tp, partitionEpoch, context.epoch); @@ -2498,6 +2512,8 @@ public void close() throws Exception { context.lock.lock(); try { context.transitionTo(CoordinatorState.CLOSED); + } catch (Throwable ex) { + log.warn("Failed to unload metadata for {} due to {}.", tp, ex.getMessage(), ex); } finally { context.lock.unlock(); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 3c2021a118c44..9e4e6f7bb9b44 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -1118,6 +1118,58 @@ public void testScheduleUnloadingWithStalePartitionEpoch() { assertEquals(10, ctx.epoch); } + @Test + public void testScheduleUnloadingWithException() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(metrics) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + doThrow(new KafkaException("error")).when(coordinator).onUnloaded(); + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.withExecutor(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Loads the coordinator. It directly transitions to active. + runtime.scheduleLoadOperation(TP, 10); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(ACTIVE, ctx.state); + assertEquals(10, ctx.epoch); + + // Schedule the unloading. + runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); + assertEquals(CLOSED, ctx.state); + + // Verify that onUnloaded is called. + verify(coordinator, times(1)).onUnloaded(); + + // Getting the coordinator context fails because it no longer exists. + assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + } + @Test public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException { MockTimer timer = new MockTimer();