Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix flaky OffsetTopicWriteTimeoutTest (#1914)
Browse files Browse the repository at this point in the history
### Motivation

Even if the session timeout is only 1ms, there is still a chance that
the write does not time out.

### Modifications

Run each test in `OffsetTopicWriteTimeoutTest` 10 times and check the
errors could only be the expected error or just `NONE`.
  • Loading branch information
BewareMyPower authored Jun 27, 2023
1 parent a6eb7a6 commit dfb3285
Showing 1 changed file with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -90,8 +95,31 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test(timeOut = 30000)
public void testSyncGroup() throws Exception {
private Map<Errors, Integer> computeErrorsCount(Supplier<Errors> supplier) {
final var errorsCount = new HashMap<Errors, Integer>();
for (int i = 0; i < 10; i++) {
final var error = supplier.get();
errorsCount.merge(error, 1, Integer::sum);
}
return errorsCount;
}

@Test(timeOut = 60000)
public void testSyncGroup() {
final var errorsCount = computeErrorsCount(this::syncGroupTimeoutError);
for (int i = 0; i < 10; i++) {
final var error = syncGroupTimeoutError();
errorsCount.merge(error, 1, Integer::sum);
}
// There is a little chance that timeout does not happen
Assert.assertTrue(errorsCount.containsKey(Errors.REBALANCE_IN_PROGRESS));
if (errorsCount.containsKey(Errors.NONE)) {
Assert.assertEquals(errorsCount.keySet(),
new HashSet<>(Arrays.asList(Errors.NONE, Errors.REBALANCE_IN_PROGRESS)));
}
}

private Errors syncGroupTimeoutError() {
final var protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
protocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol().setName("range").setMetadata("".getBytes()));
final var joinGroupRequest = buildRequest(new JoinGroupRequest.Builder(
Expand All @@ -101,7 +129,7 @@ public void testSyncGroup() throws Exception {
), serviceAddress);
final var joinGroupFuture = new CompletableFuture<AbstractResponse>();
handler.handleJoinGroupRequest(joinGroupRequest, joinGroupFuture);
final var joinGroupResponse = (JoinGroupResponse) joinGroupFuture.get();
final var joinGroupResponse = (JoinGroupResponse) joinGroupFuture.join();
Assert.assertEquals(joinGroupResponse.error(), Errors.NONE);

final var syncGroupRequest = buildRequest(new SyncGroupRequest.Builder(
Expand All @@ -111,13 +139,22 @@ public void testSyncGroup() throws Exception {
var syncGroupFuture = new CompletableFuture<AbstractResponse>();

handler.handleSyncGroupRequest(syncGroupRequest, syncGroupFuture);
final var syncGroupResponse = (SyncGroupResponse) syncGroupFuture.get();
Assert.assertEquals(syncGroupResponse.errorCounts().keySet(),
Collections.singleton(Errors.REBALANCE_IN_PROGRESS));
final var syncGroupResponse = (SyncGroupResponse) syncGroupFuture.join();
return syncGroupResponse.error();
}

@Test(timeOut = 30000)
public void testOffsetCommit() throws Exception {
public void testOffsetCommit() {
final var errorsCount = computeErrorsCount(this::offsetCommitTimeoutError);
// There is a little chance that timeout does not happen
Assert.assertTrue(errorsCount.containsKey(Errors.REQUEST_TIMED_OUT));
if (errorsCount.containsKey(Errors.NONE)) {
Assert.assertEquals(errorsCount.keySet(),
new HashSet<>(Arrays.asList(Errors.NONE, Errors.REQUEST_TIMED_OUT)));
}
}

private Errors offsetCommitTimeoutError() {
final var offsetCommit = new OffsetCommitRequest.Builder(new OffsetCommitRequestData()
.setGroupId(DEFAULT_GROUP_ID)
.setTopics(Collections.singletonList(KafkaCommonTestUtils.newOffsetCommitRequestPartitionData(
Expand All @@ -128,7 +165,8 @@ public void testOffsetCommit() throws Exception {
final var request = buildRequest(offsetCommit, serviceAddress);
final var future = new CompletableFuture<AbstractResponse>();
handler.handleOffsetCommitRequest(request, future);
final var response = (OffsetCommitResponse) future.get();
Assert.assertEquals(response.errorCounts().keySet(), Collections.singleton(Errors.REQUEST_TIMED_OUT));
final var response = (OffsetCommitResponse) future.join();
Assert.assertEquals(response.errorCounts().size(), 1);
return response.errorCounts().keySet().stream().findAny().orElse(Errors.UNKNOWN_SERVER_ERROR);
}
}

0 comments on commit dfb3285

Please sign in to comment.