From 70c8b8d0af50d8982c71d7a994270961441e5af7 Mon Sep 17 00:00:00 2001 From: Luke Chen Date: Sat, 6 Jan 2024 23:00:20 +0900 Subject: [PATCH] KAFKA-16059: close more kafkaApis instances (#15132) Reviewers: Divij Vaidya , Justine Olshan --- .../unit/kafka/server/KafkaApisTest.scala | 320 +++++++++++------- 1 file changed, 194 insertions(+), 126 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a433c5708386..721157279e8d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -633,8 +633,8 @@ class KafkaApisTest extends Logging { val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition) val requestBuilder = new DescribeQuorumRequest.Builder(requestData) metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - - testForwardableApi(kafkaApis = createKafkaApis(raftSupport = true), + kafkaApis = createKafkaApis(raftSupport = true) + testForwardableApi(kafkaApis = kafkaApis, ApiKeys.DESCRIBE_QUORUM, requestBuilder ) @@ -645,14 +645,16 @@ class KafkaApisTest extends Logging { requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest] ): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true), + kafkaApis = createKafkaApis(enableForwarding = true, raftSupport = true) + testForwardableApi(kafkaApis = kafkaApis, apiKey, requestBuilder ) } private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = { - testForwardableApi(kafkaApis = createKafkaApis(enableForwarding = true), + kafkaApis = createKafkaApis(enableForwarding = true) + testForwardableApi(kafkaApis = kafkaApis, apiKey, requestBuilder ) @@ -1669,12 +1671,16 @@ class KafkaApisTest extends Logging { val request = buildRequest(offsetCommitRequest) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis() - kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[OffsetCommitResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) + + val response = verifyNoThrottling[OffsetCommitResponse](request) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.forCode(response.data.topics().get(0).partitions().get(0).errorCode)) + } finally { + kafkaApis.close() + } } checkInvalidPartition(-1) @@ -1701,11 +1707,15 @@ class KafkaApisTest extends Logging { val request = buildRequest(offsetCommitRequest) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis() - kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[TxnOffsetCommitResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition)) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleTxnOffsetCommitRequest(request, RequestLocal.withThreadConfinedCaching) + + val response = verifyNoThrottling[TxnOffsetCommitResponse](request) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition)) + } finally { + kafkaApis.close() + } } checkInvalidPartition(-1) @@ -2061,20 +2071,24 @@ class KafkaApisTest extends Logging { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED))) - kafkaApis = createKafkaApis() - kafkaApis.handleInitProducerIdRequest(request, requestLocal) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleInitProducerIdRequest(request, requestLocal) + + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 4) { - assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) - } else { - assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + if (version < 4) { + assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) + } else { + assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + } + } finally { + kafkaApis.close() } } } @@ -2119,20 +2133,24 @@ class KafkaApisTest extends Logging { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - kafkaApis = createKafkaApis() - kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleAddOffsetsToTxnRequest(request, requestLocal) + + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 2) { - assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) - } else { - assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + if (version < 2) { + assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) + } else { + assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + } + } finally { + kafkaApis.close() } } } @@ -2173,20 +2191,24 @@ class KafkaApisTest extends Logging { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - kafkaApis = createKafkaApis() - kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleAddPartitionsToTxnRequest(request, requestLocal) + + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 2) { - assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) - } else { - assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) + if (version < 2) { + assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) + } else { + assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID)) + } + } finally { + kafkaApis.close() } } } @@ -2416,20 +2438,24 @@ class KafkaApisTest extends Logging { responseCallback.capture(), ArgumentMatchers.eq(requestLocal) )).thenAnswer(_ => responseCallback.getValue.apply(Errors.PRODUCER_FENCED)) - kafkaApis = createKafkaApis() - kafkaApis.handleEndTxnRequest(request, requestLocal) - - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleEndTxnRequest(request, requestLocal) + + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 2) { - assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) - } else { - assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + if (version < 2) { + assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode) + } else { + assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode) + } + } finally { + kafkaApis.close() } } } @@ -2477,16 +2503,20 @@ class KafkaApisTest extends Logging { any[Long])).thenReturn(0) when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) - kafkaApis = createKafkaApis() - kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[ProduceResponse](request) - - assertEquals(1, response.data.responses.size) - val topicProduceResponse = response.data.responses.asScala.head - assertEquals(1, topicProduceResponse.partitionResponses.size) - val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head - assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode)) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) + + val response = verifyNoThrottling[ProduceResponse](request) + + assertEquals(1, response.data.responses.size) + val topicProduceResponse = response.data.responses.asScala.head + assertEquals(1, topicProduceResponse.partitionResponses.size) + val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head + assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode)) + } finally { + kafkaApis.close() + } } } @@ -2717,20 +2747,24 @@ class KafkaApisTest extends Logging { .build(version.toShort) val request = buildRequest(produceRequest) - kafkaApis = createKafkaApis() - kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) - - verify(replicaManager).appendRecords(anyLong, - anyShort, - ArgumentMatchers.eq(false), - ArgumentMatchers.eq(AppendOrigin.CLIENT), - any(), - responseCallback.capture(), - any(), - any(), - any(), - ArgumentMatchers.eq(transactionalId), - any()) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleProduceRequest(request, RequestLocal.withThreadConfinedCaching) + + verify(replicaManager).appendRecords(anyLong, + anyShort, + ArgumentMatchers.eq(false), + ArgumentMatchers.eq(AppendOrigin.CLIENT), + any(), + responseCallback.capture(), + any(), + any(), + any(), + ArgumentMatchers.eq(transactionalId), + any()) + } finally { + kafkaApis.close() + } } } @@ -2750,11 +2784,15 @@ class KafkaApisTest extends Logging { when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) - kafkaApis = createKafkaApis() - kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching) - - val response = verifyNoThrottling[AddPartitionsToTxnResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition)) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleAddPartitionsToTxnRequest(request, RequestLocal.withThreadConfinedCaching) + + val response = verifyNoThrottling[AddPartitionsToTxnResponse](request) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(invalidTopicPartition)) + } finally { + kafkaApis.close() + } } checkInvalidPartition(-1) @@ -2763,32 +2801,37 @@ class KafkaApisTest extends Logging { @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], - () => createKafkaApis(IBP_0_10_2_IV0).handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) + () => kafkaApis.handleAddOffsetsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) } @Test def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], - () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) + () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) } @Test def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = { + kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], - () => createKafkaApis(IBP_0_10_2_IV0).handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) + () => kafkaApis.handleAddPartitionsToTxnRequest(null, RequestLocal.withThreadConfinedCaching)) } @Test def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], - () => createKafkaApis(IBP_0_10_2_IV0).handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching)) + () => kafkaApis.handleEndTxnRequest(null, RequestLocal.withThreadConfinedCaching)) } @Test def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { + kafkaApis = createKafkaApis(IBP_0_10_2_IV0) assertThrows(classOf[UnsupportedVersionException], - () => createKafkaApis(IBP_0_10_2_IV0).handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) + () => kafkaApis.handleWriteTxnMarkersRequest(null, RequestLocal.withThreadConfinedCaching)) } @Test @@ -3783,13 +3826,17 @@ class KafkaApisTest extends Logging { )).thenReturn(CompletableFuture.completedFuture( new OffsetDeleteResponseData() )) - kafkaApis = createKafkaApis() - kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching) + val kafkaApis = createKafkaApis() + try { + kafkaApis.handleOffsetDeleteRequest(request, RequestLocal.NoCaching) - val response = verifyNoThrottling[OffsetDeleteResponse](request) + val response = verifyNoThrottling[OffsetDeleteResponse](request) - assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, - Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode)) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.forCode(response.data.topics.find(topic).partitions.find(invalidPartitionId).errorCode)) + } finally { + kafkaApis.close() + } } checkInvalidPartition(-1) @@ -4075,6 +4122,7 @@ class KafkaApisTest extends Logging { assertEquals(authorizedTopic, metadataResponseTopic.name()) } } + kafkaApis.close() // 4. Send TopicMetadataReq using topic name reset(clientRequestQuotaManager, requestChannel) @@ -6689,67 +6737,78 @@ class KafkaApisTest extends Logging { @Test def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleLeaderAndIsrRequest) } @Test def testRaftShouldNeverHandleStopReplicaRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleStopReplicaRequest) } @Test def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching)) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching)) } @Test def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleControlledShutdownRequest) } @Test def testRaftShouldNeverHandleAlterPartitionRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleAlterPartitionRequest) } @Test def testRaftShouldNeverHandleEnvelope(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching)) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldNeverHandleErrorMessage(kafkaApis.handleEnvelope(_, RequestLocal.withThreadConfinedCaching)) } @Test def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreatePartitionsRequest) } @Test def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteTopicsRequest) } @Test def testRaftShouldAlwaysForwardCreateAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateAcls) } @Test def testRaftShouldAlwaysForwardDeleteAcls(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleDeleteAcls) } @Test @@ -6793,7 +6852,8 @@ class KafkaApisTest extends Logging { @Test def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterPartitionReassignmentsRequest) } @Test @@ -6839,7 +6899,8 @@ class KafkaApisTest extends Logging { // We skip the pre-forward checks in handleCreateTokenRequest def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequestZk) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleCreateTokenRequestZk) } @Test @@ -6847,7 +6908,8 @@ class KafkaApisTest extends Logging { // We skip the pre-forward checks in handleRenewTokenRequest def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequestZk) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleRenewTokenRequestZk) } @Test @@ -6855,37 +6917,43 @@ class KafkaApisTest extends Logging { // We skip the pre-forward checks in handleExpireTokenRequest def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequestZk) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleExpireTokenRequestZk) } @Test def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest) } @Test def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterUserScramCredentialsRequest) } @Test def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleUpdateFeatures) } @Test def testRaftShouldAlwaysForwardElectLeaders(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleElectLeaders) } @Test def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = { metadataCache = MetadataCache.kRaftMetadataCache(brokerId) - verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest) + kafkaApis = createKafkaApis(raftSupport = true) + verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleListPartitionReassignmentsRequest) } @Test