Skip to content

Commit

Permalink
Brokers API improvements (#2743)
Browse files Browse the repository at this point in the history
* Brokers API improvements:
1. broker io rates stats added
2. active controller property set to node id
3. minor refactoring
4. FE: Add an indicator for an active broker controller

Co-authored-by: Mgrdich <[email protected]>
Co-authored-by: iliax <[email protected]>
Co-authored-by: Hrant Abrahamyan <[email protected]>
Co-authored-by: Mgrdich <[email protected]>
  • Loading branch information
4 people authored Dec 12, 2022
1 parent 7837622 commit f9906b5
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterNa
@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(brokerService.getBrokers(getCluster(clusterName))));
return Mono.just(ResponseEntity.ok(
brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.BrokerConfigDTO;
import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.BrokerDiskUsageDTO;
import com.provectus.kafka.ui.model.BrokerMetricsDTO;
import com.provectus.kafka.ui.model.ClusterDTO;
Expand All @@ -14,6 +15,7 @@
import com.provectus.kafka.ui.model.ConnectDTO;
import com.provectus.kafka.ui.model.FailoverUrlList;
import com.provectus.kafka.ui.model.Feature;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
import com.provectus.kafka.ui.model.InternalClusterState;
Expand Down Expand Up @@ -103,6 +105,8 @@ default ConfigSynonymDTO toConfigSynonym(ConfigEntry.ConfigSynonym config) {

PartitionDTO toPartition(InternalPartition topic);

BrokerDTO toBrokerDto(InternalBroker broker);

@Named("setSchemaRegistry")
default InternalSchemaRegistry setSchemaRegistry(ClustersProperties.Cluster clusterProperties) {
if (clusterProperties == null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.provectus.kafka.ui.model;

import java.math.BigDecimal;
import lombok.Data;
import org.apache.kafka.common.Node;

@Data
public class InternalBroker {

private final Integer id;
private final String host;
private final Integer port;
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;

public InternalBroker(Node node, Statistics statistics) {
this.id = node.id();
this.host = node.host();
this.port = node.port();
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Data;
import org.apache.kafka.common.Node;

@Data
public class InternalClusterState {
Expand Down Expand Up @@ -37,7 +38,9 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.orElse(null);
topicCount = statistics.getTopicDescriptions().size();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = statistics.getClusterDescription().getController() != null ? 1 : 0;
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
.map(Node::id)
.orElse(null);
version = statistics.getVersion();

if (statistics.getLogDirInfo() != null) {
Expand All @@ -53,15 +56,17 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {

bytesInPerSec = statistics
.getMetrics()
.getBytesInPerSec()
.getBrokerBytesInPerSec()
.values().stream()
.reduce(BigDecimal.ZERO, BigDecimal::add);
.reduce(BigDecimal::add)
.orElse(null);

bytesOutPerSec = statistics
.getMetrics()
.getBytesOutPerSec()
.getBrokerBytesOutPerSec()
.values().stream()
.reduce(BigDecimal.ZERO, BigDecimal::add);
.reduce(BigDecimal::add)
.orElse(null);

var partitionsStats = new PartitionsStats(statistics.getTopicDescriptions().values());
onlinePartitionCount = partitionsStats.getOnlinePartitionCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public static InternalTopic from(TopicDescription topicDescription,
topic.segmentSize(segmentStats.getSegmentSize());
}

topic.bytesInPerSec(metrics.getBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getBytesOutPerSec().get(topicDescription.name()));
topic.bytesInPerSec(metrics.getTopicBytesInPerSec().get(topicDescription.name()));
topic.bytesOutPerSec(metrics.getTopicBytesOutPerSec().get(topicDescription.name()));

topic.topicConfigs(
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
@Builder
@Value
public class Metrics {
Map<String, BigDecimal> bytesInPerSec;
Map<String, BigDecimal> bytesOutPerSec;

Map<Integer, BigDecimal> brokerBytesInPerSec;
Map<Integer, BigDecimal> brokerBytesOutPerSec;
Map<String, BigDecimal> topicBytesInPerSec;
Map<String, BigDecimal> topicBytesOutPerSec;
Map<Integer, List<RawMetric>> perBrokerMetrics;

public static Metrics empty() {
return Metrics.builder()
.bytesInPerSec(Map.of())
.bytesOutPerSec(Map.of())
.brokerBytesInPerSec(Map.of())
.brokerBytesOutPerSec(Map.of())
.topicBytesInPerSec(Map.of())
.topicBytesOutPerSec(Map.of())
.perBrokerMetrics(Map.of())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import com.provectus.kafka.ui.exception.NotFoundException;
import com.provectus.kafka.ui.exception.TopicOrPartitionNotFoundException;
import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper;
import com.provectus.kafka.ui.model.BrokerDTO;
import com.provectus.kafka.ui.model.BrokerLogdirUpdateDTO;
import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.metrics.RawMetric;
Expand Down Expand Up @@ -63,18 +63,13 @@ private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Intege
.flatMapMany(Flux::fromIterable);
}

public Flux<BrokerDTO> getBrokers(KafkaCluster cluster) {
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(node -> {
BrokerDTO broker = new BrokerDTO();
broker.setId(node.id());
broker.setHost(node.host());
broker.setPort(node.port());
return broker;
}).collect(Collectors.toList()))
.map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.provectus.kafka.ui.service;

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
Expand Down Expand Up @@ -43,6 +42,7 @@
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
Expand All @@ -53,6 +53,7 @@
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
Expand Down Expand Up @@ -125,7 +126,8 @@ private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String
}
}

//TODO: discuss - maybe we should map kafka-library's exceptions to our exceptions here
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
// (see MonoSink.success(..) javadoc for details)
private static <T> Mono<T> toMono(KafkaFuture<T> future) {
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -302,26 +304,19 @@ public Mono<ClusterDescription> describeCluster() {
}

private static Mono<ClusterDescription> describeClusterImpl(AdminClient client) {
var r = client.describeCluster();
var all = KafkaFuture.allOf(r.nodes(), r.clusterId(), r.controller(), r.authorizedOperations());
return Mono.create(sink -> all.whenComplete((res, ex) -> {
if (ex != null) {
sink.error(ex);
} else {
try {
sink.success(
new ClusterDescription(
getUninterruptibly(r.controller()),
getUninterruptibly(r.clusterId()),
getUninterruptibly(r.nodes()),
getUninterruptibly(r.authorizedOperations())
)
);
} catch (ExecutionException e) {
// can't be here, because all futures already completed
}
}
}));
var result = client.describeCluster(new DescribeClusterOptions().includeAuthorizedOperations(true));
var allOfFuture = KafkaFuture.allOf(
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
return toMono(allOfFuture).then(
Mono.fromCallable(() ->
new ClusterDescription(
result.controller().get(),
result.clusterId().get(),
result.nodes().get(),
result.authorizedOperations().get()
)
)
);
}

private static Mono<String> getClusterVersion(AdminClient client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map<Node,
WellKnownMetrics wellKnownMetrics = new WellKnownMetrics();
perBrokerMetrics.forEach((node, metrics) ->
metrics.forEach(metric ->
wellKnownMetrics.populate(cluster, node, metric)));
wellKnownMetrics.populate(node, metric)));
return wellKnownMetrics;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.apache.commons.lang3.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang3.StringUtils.endsWithIgnoreCase;

import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Metrics;
import java.math.BigDecimal;
import java.util.HashMap;
Expand All @@ -12,16 +11,42 @@

class WellKnownMetrics {

// per broker
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
final Map<Integer, BigDecimal> brokerBytesOutFifteenMinuteRate = new HashMap<>();

// per topic
final Map<String, BigDecimal> bytesInFifteenMinuteRate = new HashMap<>();
final Map<String, BigDecimal> bytesOutFifteenMinuteRate = new HashMap<>();

void populate(KafkaCluster cluster, Node node, RawMetric rawMetric) {
void populate(Node node, RawMetric rawMetric) {
updateBrokerIOrates(node, rawMetric);
updateTopicsIOrates(rawMetric);
}

void apply(Metrics.MetricsBuilder metricsBuilder) {
metricsBuilder.bytesInPerSec(bytesInFifteenMinuteRate);
metricsBuilder.bytesOutPerSec(bytesOutFifteenMinuteRate);
metricsBuilder.topicBytesInPerSec(bytesInFifteenMinuteRate);
metricsBuilder.topicBytesOutPerSec(bytesOutFifteenMinuteRate);
metricsBuilder.brokerBytesInPerSec(brokerBytesInFifteenMinuteRate);
metricsBuilder.brokerBytesOutPerSec(brokerBytesOutFifteenMinuteRate);
}

private void updateBrokerIOrates(Node node, RawMetric rawMetric) {
String name = rawMetric.name();
if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
&& rawMetric.labels().size() == 1
&& "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
&& containsIgnoreCase(name, "BrokerTopicMetrics")
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value());
}
if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
&& rawMetric.labels().size() == 1
&& "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
&& containsIgnoreCase(name, "BrokerTopicMetrics")
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
}
}

private void updateTopicsIOrates(RawMetric rawMetric) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.provectus.kafka.ui.service;

import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.model.BrokerDTO;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.test.StepVerifier;
Expand All @@ -16,14 +15,11 @@ class BrokerServiceTest extends AbstractIntegrationTest {

@Test
void getBrokersReturnsFilledBrokerDto() {
BrokerDTO expectedBroker = new BrokerDTO();
expectedBroker.setId(1);
expectedBroker.setHost(kafka.getHost());
expectedBroker.setPort(kafka.getFirstMappedPort());

var localCluster = clustersStorage.getClusterByName(LOCAL).get();
StepVerifier.create(brokerService.getBrokers(localCluster))
.expectNext(expectedBroker)
.expectNextMatches(b -> b.getId().equals(1)
&& b.getHost().equals(kafka.getHost())
&& b.getPort().equals(kafka.getFirstMappedPort()))
.verifyComplete();
}

Expand Down
Loading

0 comments on commit f9906b5

Please sign in to comment.