Skip to content

Commit

Permalink
Add API support for empty consumer group id
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Jan 10, 2024
1 parent 9bcdeed commit 01e51c9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ CompletionStage<List<ConsumerGroup>> listConsumerGroups(List<String> groupIds, L
.thenCompose(groups -> augmentList(adminClient, groups, includes));
}

public CompletionStage<ConsumerGroup> describeConsumerGroup(String groupId, List<String> includes) {
public CompletionStage<ConsumerGroup> describeConsumerGroup(String requestGroupId, List<String> includes) {
Admin adminClient = clientSupplier.get();
String groupId = preprocessGroupId(requestGroupId);

return assertConsumerGroupExists(adminClient, groupId)
.thenCompose(nothing -> describeConsumerGroups(adminClient, List.of(groupId), includes))
Expand Down Expand Up @@ -209,12 +210,12 @@ public CompletionStage<Map<String, List<String>>> listConsumerGroupMembership(Co

public CompletionStage<Void> patchConsumerGroup(ConsumerGroup patch) {
Admin adminClient = clientSupplier.get();
String groupId = patch.getGroupId();
String groupId = preprocessGroupId(patch.getGroupId());

return assertConsumerGroupExists(adminClient, groupId)
.thenComposeAsync(nothing -> Optional.ofNullable(patch.getOffsets())
.filter(Predicate.not(Collection::isEmpty))
.map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, patch))
.map(patchedOffsets -> alterConsumerGroupOffsets(adminClient, groupId, patch))
.orElseGet(() -> CompletableFuture.completedStage(null)),
threadContext.currentContextExecutor());
}
Expand All @@ -230,9 +231,7 @@ CompletionStage<Void> assertConsumerGroupExists(Admin adminClient, String groupI
});
}

CompletionStage<Void> alterConsumerGroupOffsets(Admin adminClient, ConsumerGroup patch) {
String groupId = patch.getGroupId();

CompletionStage<Void> alterConsumerGroupOffsets(Admin adminClient, String groupId, ConsumerGroup patch) {
var topicsToDescribe = patch.getOffsets()
.stream()
.map(OffsetAndMetadata::topicId)
Expand Down Expand Up @@ -389,8 +388,9 @@ static <F extends Object> CompletableFuture<Void> allOf(Collection<CompletableFu
});
}

public CompletionStage<Void> deleteConsumerGroup(String groupId) {
public CompletionStage<Void> deleteConsumerGroup(String requestGroupId) {
Admin adminClient = clientSupplier.get();
String groupId = preprocessGroupId(requestGroupId);

return adminClient.deleteConsumerGroups(List.of(groupId))
.deletedGroups()
Expand Down Expand Up @@ -571,4 +571,8 @@ void addOffsets(ConsumerGroup group,
group.setOffsets(offsets);
}
}

static String preprocessGroupId(String groupId) {
return " ".equals(groupId) ? "" : groupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,30 @@ void testDescribeConsumerGroupWithNoSuchGroup() {
}
}

@Test
void testDescribeConsumerGroupWithEmptyGroupId() {
String topic1 = "t1-" + UUID.randomUUID().toString();
String group1 = "";
String client1 = "c1-" + UUID.randomUUID().toString();

try (var consumer = groupUtils.request()
.groupId(group1)
.topic(topic1, 2)
.clientId(client1)
.autoClose(false)
// Don't actually produce or consume anything
.messagesPerTopic(0)
.consumeMessages(0)
.consume()) {
// must be fetched with a single blank space character
whenRequesting(req -> req.get("{groupId}", clusterId1, " "))
.assertThat()
.statusCode(is(Status.OK.getStatusCode()))
.body("data.id", is(group1))
.body("data.attributes.state", is(Matchers.notNullValue(String.class)));
}
}

@Test
void testDescribeConsumerGroupWithFetchGroupOffsetsError() {
Answer<ListConsumerGroupOffsetsResult> listConsumerGroupOffsetsFailed = args -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
Expand Down Expand Up @@ -320,7 +319,23 @@ void consumeMessages(ConsumerRequest consumerRequest, ConsumerResponse response)
response.consumer = new KafkaConsumer<>(consumerConfig);

try {
response.consumer.subscribe(consumerRequest.topics.stream().map(NewTopic::name).collect(Collectors.toList()));
List<String> topics = consumerRequest.topics
.stream()
.map(NewTopic::name)
.toList();

if (consumerRequest.groupId.isEmpty()) {
List<TopicPartition> assignments = topics.stream()
.map(response.consumer::partitionsFor)
.flatMap(partitions -> partitions.stream().map(p -> new TopicPartition(p.topic(), p.partition())))
.distinct()
.toList();

// Must use assign instead of subscribe to support empty group.id
response.consumer.assign(assignments);
} else {
response.consumer.subscribe(topics);
}

if (consumerRequest.consumeMessages < 1 && consumerRequest.messagesPerTopic < 1) {
var records = response.consumer.poll(Duration.ofSeconds(5));
Expand Down

0 comments on commit 01e51c9

Please sign in to comment.