Skip to content

Commit

Permalink
Limit consumer group API calls to page, add log end offset to describe (
Browse files Browse the repository at this point in the history
#279)

Also support consumer group ID (name) and state filtering on list
operation

Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Dec 19, 2023
1 parent f349372 commit f447a54
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.microprofile.openapi.annotations.tags.Tag;

import com.github.eyefloaters.console.api.model.ConsumerGroup;
import com.github.eyefloaters.console.api.model.ConsumerGroupFilterParams;
import com.github.eyefloaters.console.api.model.ListFetchParams;
import com.github.eyefloaters.console.api.service.ConsumerGroupService;
import com.github.eyefloaters.console.api.support.ErrorCategory;
Expand Down Expand Up @@ -104,10 +105,19 @@ public CompletionStage<Response> listConsumerGroups(

@BeanParam
@Valid
ListFetchParams listParams) {
ListFetchParams listParams,

@BeanParam
@Valid
ConsumerGroupFilterParams filters) {

requestedFields.accept(fields);
ListRequestContext<ConsumerGroup> listSupport = new ListRequestContext<>(ConsumerGroup.Fields.COMPARATOR_BUILDER, uriInfo.getRequestUri(), listParams, ConsumerGroup::fromCursor);
ListRequestContext<ConsumerGroup> listSupport = new ListRequestContext<>(
filters.buildPredicates(),
ConsumerGroup.Fields.COMPARATOR_BUILDER,
uriInfo.getRequestUri(),
listParams,
ConsumerGroup::fromCursor);

return consumerGroupService.listConsumerGroups(fields, listSupport)
.thenApply(groups -> new ConsumerGroup.ListResponse(groups, listSupport))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.github.eyefloaters.console.api.model;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;

import jakarta.ws.rs.QueryParam;

import org.eclipse.microprofile.openapi.annotations.enums.Explode;
import org.eclipse.microprofile.openapi.annotations.media.Schema;
import org.eclipse.microprofile.openapi.annotations.parameters.Parameter;

import com.github.eyefloaters.console.api.support.ErrorCategory;
import com.github.eyefloaters.console.api.support.FetchFilterPredicate;

import io.xlate.validation.constraints.Expression;
import io.xlate.validation.constraints.Expression.ExceptionalValue;

public class ConsumerGroupFilterParams {

@QueryParam("filter[id]")
@Parameter(
description = "Retrieve only consumer groups with an ID matching this parameter",
schema = @Schema(implementation = String[].class, minItems = 2),
explode = Explode.FALSE)
@Expression(
when = "self != null",
value = "self.operator == 'eq' || self.operator == 'in' || self.operator == 'like'",
message = "unsupported filter operator, supported values: [ 'eq', 'in', 'like' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[id]")
@Expression(
when = "self != null",
value = "self.operands.size() >= 1",
message = "at least 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[id]")
FetchFilter idFilter;

@QueryParam("filter[state]")
@Parameter(
description = "Retrieve only consumer groups matching the state identified by this parameter",
schema = @Schema(implementation = String[].class, minItems = 2),
explode = Explode.FALSE)
@Expression(
when = "self != null",
value = "self.operator == 'eq' || self.operator == 'in'",
message = "unsupported filter operator, supported values: [ 'eq', 'in' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[state]")
@Expression(
when = "self != null",
value = "self.operands.size() >= 1",
message = "at least 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[state]")
@Expression(
when = "self != null && self.operands.size() >= 1",
classImports = "org.apache.kafka.common.ConsumerGroupState",
value = """
self.operands.stream()
.map(state -> ConsumerGroupState.valueOf(state))
.noneMatch(state -> state == ConsumerGroupState.UNKNOWN)
""",
exceptionalValue = ExceptionalValue.UNSET,
message = "operands list contains an invalid consumer group state",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[state]")
FetchFilter stateFilter;

public List<Predicate<ConsumerGroup>> buildPredicates() {
List<Predicate<ConsumerGroup>> predicates = new ArrayList<>(2);

if (idFilter != null) {
predicates.add(new FetchFilterPredicate<>(idFilter, ConsumerGroup::getGroupId));
}

if (stateFilter != null) {
predicates.add(new FetchFilterPredicate<>("filter[state]", stateFilter, ConsumerGroup::getState));
}

return predicates;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ be made to the consumer group offset(s) for the partition.
> offset,

@Schema(readOnly = true)
long lag,
Long logEndOffset,

@Schema(readOnly = true)
Long lag,

String metadata,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand All @@ -50,6 +52,7 @@
import com.github.eyefloaters.console.api.model.PartitionInfo;
import com.github.eyefloaters.console.api.model.Topic;
import com.github.eyefloaters.console.api.support.ConsumerGroupValidation;
import com.github.eyefloaters.console.api.support.FetchFilterPredicate;
import com.github.eyefloaters.console.api.support.KafkaOffsetSpec;
import com.github.eyefloaters.console.api.support.ListRequestContext;
import com.github.eyefloaters.console.api.support.UnknownTopicIdPatch;
Expand Down Expand Up @@ -119,21 +122,38 @@ public CompletionStage<List<ConsumerGroup>> listConsumerGroups(String topicId, L
CompletionStage<List<ConsumerGroup>> listConsumerGroups(List<String> groupIds, List<String> includes, ListRequestContext<ConsumerGroup> listSupport) {
Admin adminClient = clientSupplier.get();

return adminClient.listConsumerGroups()
Set<ConsumerGroupState> states = listSupport.filters()
.stream()
.filter(FetchFilterPredicate.class::isInstance)
.map(FetchFilterPredicate.class::cast)
.filter(filter -> "filter[state]".equals(filter.name()))
.map(filter -> {
@SuppressWarnings("unchecked")
List<String> operands = filter.operands();
return operands.stream()
.map(ConsumerGroupState::valueOf)
.collect(Collectors.toSet());
})
.findFirst()
.orElse(null);

return adminClient.listConsumerGroups(new ListConsumerGroupsOptions()
.inStates(states))
.valid()
.toCompletionStage()
.thenApply(groups -> groups.stream()
.filter(group -> groupIds.isEmpty() || groupIds.contains(group.groupId()))
.map(ConsumerGroup::fromKafkaModel)
.toList())
.thenCompose(groups -> augmentList(adminClient, groups, includes))
.thenApply(list -> list.stream()
.filter(listSupport)
.map(listSupport::tally)
.filter(listSupport::betweenCursors)
.sorted(listSupport.getSortComparator())
.dropWhile(listSupport::beforePageBegin)
.takeWhile(listSupport::pageCapacityAvailable)
.toList());
.toList())
.thenCompose(groups -> augmentList(adminClient, groups, includes));
}

public CompletionStage<ConsumerGroup> describeConsumerGroup(String groupId, List<String> includes) {
Expand All @@ -148,7 +168,11 @@ public CompletionStage<ConsumerGroup> describeConsumerGroup(String groupId, List
public CompletionStage<Map<String, List<String>>> listConsumerGroupMembership(Collection<String> topicIds) {
Admin adminClient = clientSupplier.get();

return adminClient.listConsumerGroups()
return adminClient.listConsumerGroups(new ListConsumerGroupsOptions()
.inStates(Set.of(
ConsumerGroupState.STABLE,
ConsumerGroupState.PREPARING_REBALANCE,
ConsumerGroupState.COMPLETING_REBALANCE)))
.valid()
.toCompletionStage()
.thenApply(groups -> groups.stream().map(ConsumerGroup::fromKafkaModel).toList())
Expand Down Expand Up @@ -520,25 +544,26 @@ void addOffsets(ConsumerGroup group,

groupOffsets.forEach((topicPartition, offsetsAndMetadata) -> {
long offset = offsetsAndMetadata.offset();
var endOffset = Optional.ofNullable(topicOffsets.get(topicPartition))
.map(offsetOrError -> {
if (offsetOrError.isPrimaryPresent()) {
return offsetOrError.getPrimary().offset();
}

Throwable listOffsetsError = offsetOrError.getAlternate();
String msg = "Unable to list offsets for topic/partition %s-%d"
.formatted(topicPartition.topic(), topicPartition.partition());
group.addError(new Error(msg, listOffsetsError.getMessage(), listOffsetsError));
return null;
});

offsets.add(new OffsetAndMetadata(
topicIds.get(topicPartition.topic()),
topicPartition.topic(),
topicPartition.partition(),
Either.of(offsetsAndMetadata.offset()),
Optional.ofNullable(topicOffsets.get(topicPartition))
.map(partitionOffsets -> {
// Calculate lag
if (partitionOffsets.isPrimaryPresent()) {
return partitionOffsets.getPrimary().offset() - offset;
}

Throwable listOffsetsError = partitionOffsets.getAlternate();
String msg = "Unable to list offsets for topic/partition %s-%d"
.formatted(topicPartition.topic(), topicPartition.partition());
group.addError(new Error(msg, listOffsetsError.getMessage(), listOffsetsError));
return -1L;
})
.orElse(-1L), // lag
endOffset.orElse(null), // log end offset
endOffset.map(end -> end - offset).orElse(null), // lag
offsetsAndMetadata.metadata(),
offsetsAndMetadata.leaderEpoch().orElse(null)));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

public class FetchFilterPredicate<B, F> implements Predicate<B> {

private final String name;
private final String operator;
private final List<F> operands;
private final Function<B, F> fieldSource;
private final Pattern likePattern;

public FetchFilterPredicate(FetchFilter filter, Function<String, F> operandParser, Function<B, F> fieldSource) {
public FetchFilterPredicate(String name, FetchFilter filter, Function<String, F> operandParser, Function<B, F> fieldSource) {
this.name = name;
this.operator = filter.getOperator();
this.operands = filter.getOperands().stream().map(operandParser).toList();
this.fieldSource = fieldSource;
Expand All @@ -31,15 +33,36 @@ public FetchFilterPredicate(FetchFilter filter, Function<String, F> operandParse
}
}

public FetchFilterPredicate(FetchFilter filter, Function<String, F> operandParser, Function<B, F> fieldSource) {
this(null, filter, operandParser, fieldSource);
}

@SuppressWarnings("unchecked")
public FetchFilterPredicate(String name, FetchFilter filter, Function<B, F> fieldSource) {
this(name, filter, op -> (F) op, fieldSource);
}

@SuppressWarnings("unchecked")
public FetchFilterPredicate(FetchFilter filter, Function<B, F> fieldSource) {
this(filter, op -> (F) op, fieldSource);
this(null, filter, op -> (F) op, fieldSource);
}

private F firstOperand() {
return operands.get(0);
}

public String name() {
return name;
}

public String operator() {
return operator;
}

public List<F> operands() {
return operands;
}

@Override
public boolean test(B bean) {
F field = fieldSource.apply(bean);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ static <C> C mapCursor(String name, JsonObject source, Function<JsonObject, C> m
}
}

public List<Predicate<T>> filters() {
return Collections.unmodifiableList(filters);
}

@Override
public boolean test(T t) {
return filters.isEmpty() || filters.stream().allMatch(filter -> filter.test(t));
Expand Down
Loading

0 comments on commit f447a54

Please sign in to comment.