From c043a69a949bbfe18acdbe14c5f393c1d4a81de7 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Jun 2023 12:28:07 +0800 Subject: [PATCH] Fix flaky OffsetTopicWriteTimeoutTest (#1914) ### Motivation Even if the session timeout is only 1ms, there is still a chance that the write does not time out. ### Modifications Run each test in `OffsetTopicWriteTimeoutTest` 10 times and check the errors could only be the expected error or just `NONE`. (cherry picked from commit dfb328570c36ceaa2d0a39322f7f7145e1618d87) --- .../kop/OffsetTopicWriteTimeoutTest.java | 56 ++++++++++++++++--- 1 file changed, 47 insertions(+), 9 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/OffsetTopicWriteTimeoutTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/OffsetTopicWriteTimeoutTest.java index b5d74bdd7a..32c47a9d7b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/OffsetTopicWriteTimeoutTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/OffsetTopicWriteTimeoutTest.java @@ -21,10 +21,15 @@ import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -90,8 +95,31 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test(timeOut = 30000) - public void testSyncGroup() throws Exception { + private Map computeErrorsCount(Supplier supplier) { + final var errorsCount = new HashMap(); + for (int i = 0; i < 10; i++) { + final var error = supplier.get(); + errorsCount.merge(error, 1, Integer::sum); + } + return errorsCount; + } + + @Test(timeOut = 60000) + public void testSyncGroup() { + final var errorsCount = computeErrorsCount(this::syncGroupTimeoutError); + for (int i = 0; i < 10; i++) { + final var error = syncGroupTimeoutError(); + errorsCount.merge(error, 1, Integer::sum); + } + // There is a little chance that timeout does not happen + Assert.assertTrue(errorsCount.containsKey(Errors.REBALANCE_IN_PROGRESS)); + if (errorsCount.containsKey(Errors.NONE)) { + Assert.assertEquals(errorsCount.keySet(), + new HashSet<>(Arrays.asList(Errors.NONE, Errors.REBALANCE_IN_PROGRESS))); + } + } + + private Errors syncGroupTimeoutError() { final var protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(); protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol().setName("range").setMetadata("".getBytes())); final var joinGroupRequest = buildRequest(new JoinGroupRequest.Builder( @@ -101,7 +129,7 @@ public void testSyncGroup() throws Exception { ), serviceAddress); final var joinGroupFuture = new CompletableFuture(); handler.handleJoinGroupRequest(joinGroupRequest, joinGroupFuture); - final var joinGroupResponse = (JoinGroupResponse) joinGroupFuture.get(); + final var joinGroupResponse = (JoinGroupResponse) joinGroupFuture.join(); Assert.assertEquals(joinGroupResponse.error(), Errors.NONE); final var syncGroupRequest = buildRequest(new SyncGroupRequest.Builder( @@ -111,13 +139,22 @@ public void testSyncGroup() throws Exception { var syncGroupFuture = new CompletableFuture(); handler.handleSyncGroupRequest(syncGroupRequest, syncGroupFuture); - final var syncGroupResponse = (SyncGroupResponse) syncGroupFuture.get(); - Assert.assertEquals(syncGroupResponse.errorCounts().keySet(), - Collections.singleton(Errors.REBALANCE_IN_PROGRESS)); + final var syncGroupResponse = (SyncGroupResponse) syncGroupFuture.join(); + return syncGroupResponse.error(); } @Test(timeOut = 30000) - public void testOffsetCommit() throws Exception { + public void testOffsetCommit() { + final var errorsCount = computeErrorsCount(this::offsetCommitTimeoutError); + // There is a little chance that timeout does not happen + Assert.assertTrue(errorsCount.containsKey(Errors.REQUEST_TIMED_OUT)); + if (errorsCount.containsKey(Errors.NONE)) { + Assert.assertEquals(errorsCount.keySet(), + new HashSet<>(Arrays.asList(Errors.NONE, Errors.REQUEST_TIMED_OUT))); + } + } + + private Errors offsetCommitTimeoutError() { final var offsetCommit = new OffsetCommitRequest.Builder(new OffsetCommitRequestData() .setGroupId(DEFAULT_GROUP_ID) .setTopics(Collections.singletonList(KafkaCommonTestUtils.newOffsetCommitRequestPartitionData( @@ -128,7 +165,8 @@ public void testOffsetCommit() throws Exception { final var request = buildRequest(offsetCommit, serviceAddress); final var future = new CompletableFuture(); handler.handleOffsetCommitRequest(request, future); - final var response = (OffsetCommitResponse) future.get(); - Assert.assertEquals(response.errorCounts().keySet(), Collections.singleton(Errors.REQUEST_TIMED_OUT)); + final var response = (OffsetCommitResponse) future.join(); + Assert.assertEquals(response.errorCounts().size(), 1); + return response.errorCounts().keySet().stream().findAny().orElse(Errors.UNKNOWN_SERVER_ERROR); } }