From 01e51c96c6a7d48a24244902e903581660f53d3f Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 5 Jan 2024 11:07:04 -0500 Subject: [PATCH] Add API support for empty consumer group id Signed-off-by: Michael Edgar --- .../api/service/ConsumerGroupService.java | 18 ++++++++------ .../console/api/ConsumerGroupsResourceIT.java | 24 +++++++++++++++++++ .../kafka/systemtest/utils/ConsumerUtils.java | 19 +++++++++++++-- 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/com/github/eyefloaters/console/api/service/ConsumerGroupService.java b/api/src/main/java/com/github/eyefloaters/console/api/service/ConsumerGroupService.java index db3fa8e1e..55fba2975 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/service/ConsumerGroupService.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/service/ConsumerGroupService.java @@ -156,8 +156,9 @@ CompletionStage> listConsumerGroups(List groupIds, L .thenCompose(groups -> augmentList(adminClient, groups, includes)); } - public CompletionStage describeConsumerGroup(String groupId, List includes) { + public CompletionStage describeConsumerGroup(String requestGroupId, List includes) { Admin adminClient = clientSupplier.get(); + String groupId = preprocessGroupId(requestGroupId); return assertConsumerGroupExists(adminClient, groupId) .thenCompose(nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes)) @@ -209,12 +210,12 @@ public CompletionStage>> listConsumerGroupMembership(Co public CompletionStage patchConsumerGroup(ConsumerGroup patch) { Admin adminClient = clientSupplier.get(); - String groupId = patch.getGroupId(); + String groupId = preprocessGroupId(patch.getGroupId()); return assertConsumerGroupExists(adminClient, groupId) .thenComposeAsync(nothing -> Optional.ofNullable(patch.getOffsets()) .filter(Predicate.not(Collection::isEmpty)) - .map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, patch)) + .map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, groupId, patch)) .orElseGet(() -> CompletableFuture.completedStage(null)), threadContext.currentContextExecutor()); } @@ -230,9 +231,7 @@ CompletionStage assertConsumerGroupExists(Admin adminClient, String groupI }); } - CompletionStage alterConsumerGroupOffsets(Admin adminClient, ConsumerGroup patch) { - String groupId = patch.getGroupId(); - + CompletionStage alterConsumerGroupOffsets(Admin adminClient, String groupId, ConsumerGroup patch) { var topicsToDescribe = patch.getOffsets() .stream() .map(OffsetAndMetadata::topicId) @@ -389,8 +388,9 @@ static CompletableFuture allOf(Collection deleteConsumerGroup(String groupId) { + public CompletionStage deleteConsumerGroup(String requestGroupId) { Admin adminClient = clientSupplier.get(); + String groupId = preprocessGroupId(requestGroupId); return adminClient.deleteConsumerGroups(List.of(groupId)) .deletedGroups() @@ -571,4 +571,8 @@ void addOffsets(ConsumerGroup group, group.setOffsets(offsets); } } + + static String preprocessGroupId(String groupId) { + return " ".equals(groupId) ? "" : groupId; + } } diff --git a/api/src/test/java/com/github/eyefloaters/console/api/ConsumerGroupsResourceIT.java b/api/src/test/java/com/github/eyefloaters/console/api/ConsumerGroupsResourceIT.java index e61fde590..3b0ab9d90 100644 --- a/api/src/test/java/com/github/eyefloaters/console/api/ConsumerGroupsResourceIT.java +++ b/api/src/test/java/com/github/eyefloaters/console/api/ConsumerGroupsResourceIT.java @@ -454,6 +454,30 @@ void testDescribeConsumerGroupWithNoSuchGroup() { } } + @Test + void testDescribeConsumerGroupWithEmptyGroupId() { + String topic1 = "t1-" + UUID.randomUUID().toString(); + String group1 = ""; + String client1 = "c1-" + UUID.randomUUID().toString(); + + try (var consumer = groupUtils.request() + .groupId(group1) + .topic(topic1, 2) + .clientId(client1) + .autoClose(false) + // Don't actually produce or consume anything + .messagesPerTopic(0) + .consumeMessages(0) + .consume()) { + // must be fetched with a single blank space character + whenRequesting(req -> req.get("{groupId}", clusterId1, " ")) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.id", is(group1)) + .body("data.attributes.state", is(Matchers.notNullValue(String.class))); + } + } + @Test void testDescribeConsumerGroupWithFetchGroupOffsetsError() { Answer listConsumerGroupOffsetsFailed = args -> { diff --git a/api/src/test/java/com/github/eyefloaters/console/kafka/systemtest/utils/ConsumerUtils.java b/api/src/test/java/com/github/eyefloaters/console/kafka/systemtest/utils/ConsumerUtils.java index 3e10fdb63..9ada5deb7 100644 --- a/api/src/test/java/com/github/eyefloaters/console/kafka/systemtest/utils/ConsumerUtils.java +++ b/api/src/test/java/com/github/eyefloaters/console/kafka/systemtest/utils/ConsumerUtils.java @@ -11,7 +11,6 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; @@ -320,7 +319,23 @@ void consumeMessages(ConsumerRequest consumerRequest, ConsumerResponse response) response.consumer = new KafkaConsumer<>(consumerConfig); try { - response.consumer.subscribe(consumerRequest.topics.stream().map(NewTopic::name).collect(Collectors.toList())); + List topics = consumerRequest.topics + .stream() + .map(NewTopic::name) + .toList(); + + if (consumerRequest.groupId.isEmpty()) { + List assignments = topics.stream() + .map(response.consumer::partitionsFor) + .flatMap(partitions -> partitions.stream().map(p -> new TopicPartition(p.topic(), p.partition()))) + .distinct() + .toList(); + + // Must use assign instead of subscribe to support empty group.id + response.consumer.assign(assignments); + } else { + response.consumer.subscribe(topics); + } if (consumerRequest.consumeMessages < 1 && consumerRequest.messagesPerTopic < 1) { var records = response.consumer.poll(Duration.ofSeconds(5));