Skip to content

Commit 5cff8f6

Browse files
authored
KAFKA-14296; Partition leaders are not demoted during kraft controlled shutdown (apache#12741)
When the `BrokerServer` starts its shutting down process, it transitions to `SHUTTING_DOWN` and sets `isShuttingDown` to `true`. With this state change, the follower state changes are short-cutted. This means that a broker which was serving as leader would remain acting as a leader until controlled shutdown completes. Instead, we want the leader and ISR state to be updated so that requests will return NOT_LEADER and the client can find the new leader. We missed this case while implementing apache#12187. This patch fixes the issue and updates an existing test to ensure that `isShuttingDown` has not effect. We should consider adding integration tests for this as well. We can do this separately. Reviewers: Ismael Juma <[email protected]>, José Armando García Sancio <[email protected]>, Jason Gustafson <[email protected]>
1 parent 0cb1d61 commit 5cff8f6

File tree

2 files changed

+28
-25
lines changed

2 files changed

+28
-25
lines changed

core/src/main/scala/kafka/server/ReplicaManager.scala

+17-22
Original file line numberDiff line numberDiff line change
@@ -2142,7 +2142,6 @@ class ReplicaManager(val config: KafkaConfig,
21422142
): Unit = {
21432143
stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) to " +
21442144
"local followers.")
2145-
val shuttingDown = isShuttingDown.get()
21462145
val partitionsToStartFetching = new mutable.HashMap[TopicPartition, Partition]
21472146
val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
21482147
val followerTopicSet = new mutable.HashSet[String]
@@ -2151,28 +2150,24 @@ class ReplicaManager(val config: KafkaConfig,
21512150
try {
21522151
followerTopicSet.add(tp.topic)
21532152

2154-
if (shuttingDown) {
2155-
stateChangeLogger.trace(s"Unable to start fetching $tp with topic " +
2156-
s"ID ${info.topicId} because the replica manager is shutting down.")
2157-
} else {
2158-
// We always update the follower state.
2159-
// - This ensure that a replica with no leader can step down;
2160-
// - This also ensures that the local replica is created even if the leader
2161-
// is unavailable. This is required to ensure that we include the partition's
2162-
// high watermark in the checkpoint file (see KAFKA-1647).
2163-
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
2164-
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))
2165-
2166-
if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
2167-
!info.partition.isr.contains(config.brokerId))) {
2168-
// During controlled shutdown, replica with no leaders and replica
2169-
// where this broker is not in the ISR are stopped.
2170-
partitionsToStopFetching.put(tp, false)
2171-
} else if (isNewLeaderEpoch) {
2172-
// Otherwise, fetcher is restarted if the leader epoch has changed.
2173-
partitionsToStartFetching.put(tp, partition)
2174-
}
2153+
// We always update the follower state.
2154+
// - This ensure that a replica with no leader can step down;
2155+
// - This also ensures that the local replica is created even if the leader
2156+
// is unavailable. This is required to ensure that we include the partition's
2157+
// high watermark in the checkpoint file (see KAFKA-1647).
2158+
val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
2159+
val isNewLeaderEpoch = partition.makeFollower(state, offsetCheckpoints, Some(info.topicId))
2160+
2161+
if (isInControlledShutdown && (info.partition.leader == NO_LEADER ||
2162+
!info.partition.isr.contains(config.brokerId))) {
2163+
// During controlled shutdown, replica with no leaders and replica
2164+
// where this broker is not in the ISR are stopped.
2165+
partitionsToStopFetching.put(tp, false)
2166+
} else if (isNewLeaderEpoch) {
2167+
// Otherwise, fetcher is restarted if the leader epoch has changed.
2168+
partitionsToStartFetching.put(tp, partition)
21752169
}
2170+
21762171
changedPartitions.add(partition)
21772172
} catch {
21782173
case e: KafkaStorageException =>

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+11-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.File
2121
import java.net.InetAddress
2222
import java.nio.file.Files
2323
import java.util
24-
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
24+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicReference}
2525
import java.util.concurrent.{CountDownLatch, TimeUnit}
2626
import java.util.stream.IntStream
2727
import java.util.{Collections, Optional, Properties}
@@ -2210,7 +2210,8 @@ class ReplicaManagerTest {
22102210
aliveBrokerIds: Seq[Int] = Seq(0, 1),
22112211
propsModifier: Properties => Unit = _ => {},
22122212
mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None,
2213-
mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None
2213+
mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None,
2214+
isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
22142215
): ReplicaManager = {
22152216
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
22162217
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath + "," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
@@ -2245,6 +2246,7 @@ class ReplicaManagerTest {
22452246
metadataCache = metadataCache,
22462247
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
22472248
alterPartitionManager = alterPartitionManager,
2249+
isShuttingDown = isShuttingDown,
22482250
delayedProducePurgatoryParam = Some(mockProducePurgatory),
22492251
delayedFetchPurgatoryParam = Some(mockFetchPurgatory),
22502252
delayedDeleteRecordsPurgatoryParam = Some(mockDeleteRecordsPurgatory),
@@ -3868,10 +3870,12 @@ class ReplicaManagerTest {
38683870
val foo2 = new TopicPartition("foo", 2)
38693871

38703872
val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
3873+
val isShuttingDown = new AtomicBoolean(false)
38713874
val replicaManager = setupReplicaManagerWithMockedPurgatories(
38723875
timer = new MockTimer(time),
38733876
brokerId = localId,
3874-
mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
3877+
mockReplicaFetcherManager = Some(mockReplicaFetcherManager),
3878+
isShuttingDown = isShuttingDown
38753879
)
38763880

38773881
try {
@@ -3940,6 +3944,10 @@ class ReplicaManagerTest {
39403944

39413945
reset(mockReplicaFetcherManager)
39423946

3947+
// The broker transitions to SHUTTING_DOWN state. This should not have
3948+
// any impact in KRaft mode.
3949+
isShuttingDown.set(true)
3950+
39433951
// The replica begins the controlled shutdown.
39443952
replicaManager.beginControlledShutdown()
39453953

0 commit comments

Comments
 (0)