Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic and partition statuses #219

Merged
merged 7 commits into from
Dec 7, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ public Supplier<Admin> adminClientSupplier(Function<Map<String, Object>, Admin>
config.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000");

if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
String msg = config.entrySet()
.stream()
.map(entry -> "\t%s = %s".formatted(entry.getKey(), entry.getValue()))
.collect(Collectors.joining("\n", "AdminClient configuration:\n", ""));
log.debug(msg);
log.trace(msg);
}

Admin client = adminBuilder.apply(config);
Expand Down Expand Up @@ -167,6 +167,7 @@ public Supplier<Consumer<byte[], byte[]>> consumerSupplier(Supplier<Kafka> clust
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 50_000);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5000);

@SuppressWarnings("resource") // No leak, it will be closed by the disposer
Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(config);
Expand All @@ -192,7 +193,7 @@ Map<String, Object> buildConfiguration(Kafka cluster, ListenerStatus listenerSta
boolean saslEnabled;

if (authType.isBlank()) {
log.debug("Broker authentication/SASL disabled");
log.trace("Broker authentication/SASL disabled");
saslEnabled = false;
} else {
saslEnabled = true;
Expand Down Expand Up @@ -222,7 +223,7 @@ Map<String, Object> buildConfiguration(Kafka cluster, ListenerStatus listenerSta
}

void configureOAuthBearer(Map<String, Object> config) {
log.debug("SASL/OAUTHBEARER enabled");
log.trace("SASL/OAUTHBEARER enabled");
config.put(SaslConfigs.SASL_MECHANISM, OAUTHBEARER);
config.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
// Do not attempt token refresh ahead of expiration (ExpiringCredentialRefreshingLogin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ public CompletionStage<Response> listTopics(
Topic.Fields.NAME,
Topic.Fields.VISIBILITY,
Topic.Fields.PARTITIONS,
Topic.Fields.NUM_PARTITIONS,
Topic.Fields.AUTHORIZED_OPERATIONS,
Topic.Fields.CONFIGS,
Topic.Fields.RECORD_COUNT,
Topic.Fields.TOTAL_LEADER_LOG_BYTES,
Topic.Fields.CONSUMER_GROUPS,
Topic.Fields.STATUS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -168,11 +170,13 @@ public CompletionStage<Response> listTopics(
Topic.Fields.NAME,
Topic.Fields.VISIBILITY,
Topic.Fields.PARTITIONS,
Topic.Fields.NUM_PARTITIONS,
Topic.Fields.AUTHORIZED_OPERATIONS,
Topic.Fields.CONFIGS,
Topic.Fields.RECORD_COUNT,
Topic.Fields.TOTAL_LEADER_LOG_BYTES,
Topic.Fields.CONSUMER_GROUPS,
Topic.Fields.STATUS,
}))
List<String> fields,

Expand Down Expand Up @@ -237,11 +241,13 @@ public CompletionStage<Response> describeTopic(
Topic.Fields.NAME,
Topic.Fields.VISIBILITY,
Topic.Fields.PARTITIONS,
Topic.Fields.NUM_PARTITIONS,
Topic.Fields.AUTHORIZED_OPERATIONS,
Topic.Fields.CONFIGS,
Topic.Fields.RECORD_COUNT,
Topic.Fields.TOTAL_LEADER_LOG_BYTES,
Topic.Fields.CONSUMER_GROUPS,
Topic.Fields.STATUS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -254,11 +260,13 @@ public CompletionStage<Response> describeTopic(
Topic.Fields.NAME,
Topic.Fields.VISIBILITY,
Topic.Fields.PARTITIONS,
Topic.Fields.NUM_PARTITIONS,
Topic.Fields.AUTHORIZED_OPERATIONS,
Topic.Fields.CONFIGS,
Topic.Fields.RECORD_COUNT,
Topic.Fields.TOTAL_LEADER_LOG_BYTES,
Topic.Fields.CONSUMER_GROUPS,
Topic.Fields.STATUS,
}))
List<String> fields,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.eyefloaters.console.api.support.ListRequestContext;

import static java.util.Comparator.comparing;
import static java.util.Comparator.nullsLast;

@Schema(name = "KafkaClusterAttributes")
@JsonFilter("fieldFilter")
Expand Down Expand Up @@ -52,7 +53,7 @@ public static class Fields {
public static final String CONDITIONS = "conditions";

static final Comparator<KafkaCluster> ID_COMPARATOR =
comparing(KafkaCluster::getId);
comparing(KafkaCluster::getId, nullsLast(String::compareTo));

static final Map<String, Map<Boolean, Comparator<KafkaCluster>>> COMPARATORS =
ComparatorBuilder.bidirectional(
Expand Down Expand Up @@ -176,7 +177,7 @@ public static KafkaCluster fromCursor(JsonObject cursor) {

public String toCursor(List<String> sortFields) {
JsonObjectBuilder cursor = Json.createObjectBuilder()
.add("id", id);
.add("id", id == null ? Json.createValue("") : Json.createValue(id));

JsonObjectBuilder attrBuilder = Json.createObjectBuilder();
maybeAddAttribute(attrBuilder, sortFields, Fields.NAME, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
Expand Down Expand Up @@ -37,12 +38,13 @@ public PartitionInfo(int partition, List<PartitionReplica> replicas, Integer lea
}

public static PartitionInfo fromKafkaModel(TopicPartitionInfo info) {
Integer leaderId = Optional.ofNullable(info.leader()).map(Node::id).orElse(null);
List<Integer> isr = info.isr().stream().map(Node::id).toList();
List<PartitionReplica> replicas = info.replicas()
.stream()
.map(replica -> PartitionReplica.fromKafkaModel(replica, isr))
.toList();
return new PartitionInfo(info.partition(), replicas, info.leader().id());
return new PartitionInfo(info.partition(), replicas, leaderId);
}

static <P> Either<P, Error> primaryOrError(Either<P, Throwable> either, String message) {
Expand Down Expand Up @@ -80,6 +82,23 @@ public Map<String, Either<OffsetInfo, Error>> getOffsets() {
return offsets;
}

public boolean online() {
return leaderId != null;
}

@JsonProperty
public String status() {
if (!online()) {
return "Offline";
}

return replicas.stream()
.filter(Predicate.not(PartitionReplica::inSync))
.findFirst()
.map(ignored -> "UnderReplicated")
.orElse("FullyReplicated");
}

/**
* Calculates the record count as the latest offset minus the earliest offset
* when both offsets are available only. When either the latest or the earliest
Expand Down Expand Up @@ -111,12 +130,17 @@ public Long leaderLocalStorage() {
}

Optional<Long> getOffset(String key) {
return Optional.ofNullable(offsets.get(key))
return Optional.ofNullable(offsets)
.map(o -> o.get(key))
.flatMap(Either::getOptionalPrimary)
.map(OffsetInfo::offset);
}

Optional<PartitionReplica> getReplica(int nodeId) {
Optional<PartitionReplica> getReplica(Integer nodeId) {
if (nodeId == null) {
return Optional.empty();
}

return replicas.stream()
.filter(replica -> replica.nodeId() == nodeId)
.findFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import jakarta.json.Json;
import jakarta.json.JsonObject;
Expand All @@ -37,11 +39,13 @@ public static final class Fields {
public static final String NAME = "name";
public static final String VISIBILITY = "visibility";
public static final String PARTITIONS = "partitions";
public static final String NUM_PARTITIONS = "numPartitions";
public static final String AUTHORIZED_OPERATIONS = "authorizedOperations";
public static final String CONFIGS = "configs";
public static final String RECORD_COUNT = "recordCount";
public static final String TOTAL_LEADER_LOG_BYTES = "totalLeaderLogBytes";
public static final String CONSUMER_GROUPS = "consumerGroups";
public static final String STATUS = "status";
static final Pattern CONFIG_KEY = Pattern.compile("^configs\\.\"([^\"]+)\"$");

static final Comparator<Topic> ID_COMPARATOR =
Expand All @@ -63,8 +67,10 @@ RECORD_COUNT, nullsLast(comparing(topic -> topic.attributes.getRecordCount())),
public static final String LIST_DEFAULT = NAME + ", " + VISIBILITY;
public static final String DESCRIBE_DEFAULT =
NAME + ", "
+ STATUS + ", "
+ VISIBILITY + ", "
+ PARTITIONS + ", "
+ NUM_PARTITIONS + ", "
+ AUTHORIZED_OPERATIONS + ", "
+ RECORD_COUNT + ", "
+ TOTAL_LEADER_LOG_BYTES;
Expand Down Expand Up @@ -147,6 +153,30 @@ static class Attributes {
this.internal = internal;
}

@JsonProperty
public String status() {
return partitions.getOptionalPrimary()
.map(p -> {
Supplier<Stream<String>> partitionStatuses = () -> p.stream().map(PartitionInfo::status);
long offlinePartitions = partitionStatuses.get().filter("Offline"::equals).count();

if (offlinePartitions > 0) {
if (offlinePartitions < p.size()) {
return "PartiallyOffline";
} else {
return "Offline";
}
}

if (partitionStatuses.get().anyMatch("UnderReplicated"::equals)) {
return "UnderReplicated";
}

return "FullyReplicated";
})
.orElse("FullyReplicated");
}

@JsonProperty
@Schema(readOnly = true, description = """
Derived property indicating whether this is an internal (i.e. system, private) topic or
Expand All @@ -158,6 +188,12 @@ public String visibility() {
return internal || name.startsWith("_") ? "internal" : "external";
}

@JsonProperty
@Schema(readOnly = true, description = "The number of partitions in this topic")
public Integer numPartitions() {
return partitions.getOptionalPrimary().map(Collection::size).orElse(0);
}

/**
* Calculates the record count for the entire topic as the sum
* of the record counts of each individual partition. When the partitions
Expand Down Expand Up @@ -312,6 +348,10 @@ public String visibility() {
return attributes.visibility();
}

public String status() {
return attributes.status();
}

public Either<List<PartitionInfo>, Error> partitions() {
return attributes.partitions;
}
Expand All @@ -328,6 +368,14 @@ public DataList<Identifier> consumerGroups() {
return relationships.consumerGroups;
}

public boolean partitionsOnline() {
return attributes.partitions.getOptionalPrimary()
.map(Collection::stream)
.orElseGet(Stream::empty)
.map(PartitionInfo::online)
.allMatch(Boolean.TRUE::equals);
}

ConfigEntry configEntry(String key) {
return Optional.ofNullable(attributes.configs)
.flatMap(Either::getOptionalPrimary)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ public class TopicFilterParams {
node = "filter[visibility]")
FetchFilter visibilityFilter;

@QueryParam("filter[status]")
@Parameter(
description = "Retrieve only topics matching the status identified by this parameter",
schema = @Schema(implementation = String[].class, minItems = 2),
explode = Explode.FALSE)
@Expression(
when = "self != null",
value = "self.operator == 'eq' || self.operator == 'in'",
message = "unsupported filter operator, supported values: [ 'eq', 'in' ]",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[status]")
@Expression(
when = "self != null",
value = "self.operands.size() >= 1",
message = "at least 1 operand is required",
payload = ErrorCategory.InvalidQueryParameter.class,
node = "filter[status]")
FetchFilter statusFilter;

public List<Predicate<Topic>> buildPredicates() {
List<Predicate<Topic>> predicates = new ArrayList<>(3);

Expand All @@ -89,6 +108,10 @@ public List<Predicate<Topic>> buildPredicates() {
predicates.add(new FetchFilterPredicate<>(idFilter, Topic::getId));
}

if (statusFilter != null) {
predicates.add(new FetchFilterPredicate<>(statusFilter, Topic::status));
}

return predicates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka) {
.sorted(comparator)
.map(listener -> new KafkaListener(
listener.getType().toValue(),
listenerStatus(kafka, listener).getBootstrapServers(),
listenerStatus(kafka, listener).map(ListenerStatus::getBootstrapServers).orElse(null),
getAuthType(listener).orElse(null)))
.toList();

Expand Down Expand Up @@ -211,7 +211,7 @@ public static Optional<ListenerStatus> consoleListener(Kafka kafka) {
listenerSortKey(l1, Annotations.CONSOLE_LISTENER),
listenerSortKey(l2, Annotations.CONSOLE_LISTENER)))
.findFirst()
.map(listener -> listenerStatus(kafka, listener));
.flatMap(listener -> listenerStatus(kafka, listener));
}

static boolean supportedAuthentication(GenericKafkaListener listener) {
Expand Down Expand Up @@ -255,16 +255,15 @@ static boolean annotatedListener(GenericKafkaListener listener, Annotations list
.orElse(false);
}

static ListenerStatus listenerStatus(Kafka kafka, GenericKafkaListener listener) {
static Optional<ListenerStatus> listenerStatus(Kafka kafka, GenericKafkaListener listener) {
String listenerName = listener.getName();

return Optional.ofNullable(kafka.getStatus())
.map(KafkaStatus::getListeners)
.map(Collection::stream)
.orElseGet(Stream::empty)
.filter(listenerStatus -> listenerName.equals(listenerStatus.getName()))
.findFirst()
.orElse(null);
.findFirst();
}

public static Optional<String> getAuthType(Kafka kafka, ListenerStatus listener) {
Expand Down
Loading