Skip to content

Commit

Permalink
MINOR: restore `testGetAllTopicMetadataShouldNotCreateTopicOrReturnUn…
Browse files Browse the repository at this point in the history
…knownTopicPartition` (apache#18633)

Reviewers: Ismael Juma <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
TaiJuWu authored Jan 23, 2025
1 parent 01afba8 commit ce4eeaa
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3562,6 +3562,47 @@ class KafkaApisTest extends Logging {
assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
}


/**
* Metadata request to fetch all topics should not result in the followings:
* 1) Auto topic creation
* 2) UNKNOWN_TOPIC_OR_PARTITION
*
* This case is testing the case that a topic is being deleted from MetadataCache right after
* authorization but before checking in MetadataCache.
*/
@Test
def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = {
// Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache
metadataCache = mock(classOf[KRaftMetadataCache])
when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0)))
when(metadataCache.getRandomAliveBrokerId).thenReturn(None)

// 2 topics returned for authorization in during handle
val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic")
when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization)
// 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call
when(metadataCache.getTopicMetadata(
ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization),
any[ListenerName],
anyBoolean,
anyBoolean
)).thenReturn(Seq(
new MetadataResponseTopic()
.setErrorCode(Errors.NONE.code)
.setName("remaining-topic")
.setIsInternal(false)
))

val response = sendMetadataRequestWithInconsistentListeners(new ListenerName("PLAINTEXT"))
val responseTopics = response.topicMetadata().asScala.map { metadata => metadata.topic() }

// verify we don't create topic when getAllTopicMetadata
verify(autoTopicCreationManager, never).createTopics(any(), any(), any())
assertEquals(List("remaining-topic"), responseTopics)
assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty)
}

@Test
def testUnauthorizedTopicMetadataRequest(): Unit = {
// 1. Set up broker information
Expand Down

0 comments on commit ce4eeaa

Please sign in to comment.