diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5dc5ad06f29e3..af26996c56eb2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3562,6 +3562,47 @@ class KafkaApisTest extends Logging { assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet) } + + /** + * Metadata request to fetch all topics should not result in the followings: + * 1) Auto topic creation + * 2) UNKNOWN_TOPIC_OR_PARTITION + * + * This case is testing the case that a topic is being deleted from MetadataCache right after + * authorization but before checking in MetadataCache. + */ + @Test + def testGetAllTopicMetadataShouldNotCreateTopicOrReturnUnknownTopicPartition(): Unit = { + // Setup: authorizer authorizes 2 topics, but one got deleted in metadata cache + metadataCache = mock(classOf[KRaftMetadataCache]) + when(metadataCache.getAliveBrokerNodes(any())).thenReturn(List(new Node(brokerId,"localhost", 0))) + when(metadataCache.getRandomAliveBrokerId).thenReturn(None) + + // 2 topics returned for authorization in during handle + val topicsReturnedFromMetadataCacheForAuthorization = Set("remaining-topic", "later-deleted-topic") + when(metadataCache.getAllTopics()).thenReturn(topicsReturnedFromMetadataCacheForAuthorization) + // 1 topic is deleted from metadata right at the time between authorization and the next getTopicMetadata() call + when(metadataCache.getTopicMetadata( + ArgumentMatchers.eq(topicsReturnedFromMetadataCacheForAuthorization), + any[ListenerName], + anyBoolean, + anyBoolean + )).thenReturn(Seq( + new MetadataResponseTopic() + .setErrorCode(Errors.NONE.code) + .setName("remaining-topic") + .setIsInternal(false) + )) + + val response = sendMetadataRequestWithInconsistentListeners(new ListenerName("PLAINTEXT")) + val responseTopics = response.topicMetadata().asScala.map { metadata => metadata.topic() } + + // verify we don't create topic when getAllTopicMetadata + verify(autoTopicCreationManager, never).createTopics(any(), any(), any()) + assertEquals(List("remaining-topic"), responseTopics) + assertTrue(response.topicsByError(Errors.UNKNOWN_TOPIC_OR_PARTITION).isEmpty) + } + @Test def testUnauthorizedTopicMetadataRequest(): Unit = { // 1. Set up broker information