diff --git a/src/main/java/kafdrop/controller/ConsumerController.java b/src/main/java/kafdrop/controller/ConsumerController.java index 4996f44c..3b11ddaf 100644 --- a/src/main/java/kafdrop/controller/ConsumerController.java +++ b/src/main/java/kafdrop/controller/ConsumerController.java @@ -37,11 +37,8 @@ public ConsumerController(KafkaMonitor kafkaMonitor) { @RequestMapping("/{groupId:.+}") public String consumerDetail(@PathVariable("groupId") String groupId, Model model) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + model.addAttribute("consumer", consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId))); return "consumer-detail"; } @@ -53,11 +50,8 @@ public String consumerDetail(@PathVariable("groupId") String groupId, Model mode }) @GetMapping(path = "/{groupId:.+}", produces = MediaType.APPLICATION_JSON_VALUE) public @ResponseBody ConsumerVO getConsumer(@PathVariable("groupId") String groupId) throws ConsumerNotFoundException { - final var topicVos = kafkaMonitor.getTopics(); - final var consumer = kafkaMonitor.getConsumers(topicVos) - .stream() - .filter(c -> c.getGroupId().equals(groupId)) - .findAny(); + final var consumer = kafkaMonitor.getConsumersByGroup(groupId).stream().findAny(); + return consumer.orElseThrow(() -> new ConsumerNotFoundException(groupId)); } } \ No newline at end of file diff --git a/src/main/java/kafdrop/controller/TopicController.java b/src/main/java/kafdrop/controller/TopicController.java index ba9ed7c4..09a946c0 100644 --- a/src/main/java/kafdrop/controller/TopicController.java +++ b/src/main/java/kafdrop/controller/TopicController.java @@ -60,7 +60,7 @@ public String topicDetails(@PathVariable("name") String topicName, Model model) final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); model.addAttribute("topic", topic); - model.addAttribute("consumers", kafkaMonitor.getConsumers(Collections.singleton(topic))); + model.addAttribute("consumers", kafkaMonitor.getConsumersByTopics(Collections.singleton(topic))); model.addAttribute("topicDeleteEnabled", topicDeleteEnabled); model.addAttribute("keyFormat", defaultKeyFormat); model.addAttribute("format", defaultFormat); @@ -125,7 +125,7 @@ public String createTopicPage(Model model) { public @ResponseBody List getConsumers(@PathVariable("name") String topicName) { final var topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); - return kafkaMonitor.getConsumers(Collections.singleton(topic)); + return kafkaMonitor.getConsumersByTopics(Collections.singleton(topic)); } /** diff --git a/src/main/java/kafdrop/model/TopicVO.java b/src/main/java/kafdrop/model/TopicVO.java index f9467217..3055d9f0 100644 --- a/src/main/java/kafdrop/model/TopicVO.java +++ b/src/main/java/kafdrop/model/TopicVO.java @@ -44,10 +44,6 @@ public void setConfig(Map config) { this.config = config; } - public Map getPartitionMap() { - return Collections.unmodifiableMap(partitions); - } - public Collection getPartitions() { return Collections.unmodifiableCollection(partitions.values()); } diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index b3ddc6e9..458df364 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -3,6 +3,7 @@ import kafdrop.config.*; import kafdrop.model.*; import kafdrop.util.*; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.Node; import org.apache.kafka.common.*; @@ -10,11 +11,11 @@ import org.slf4j.*; import org.springframework.stereotype.*; -import javax.annotation.*; -import java.nio.*; -import java.time.*; +import javax.annotation.PostConstruct; +import java.nio.ByteBuffer; +import java.time.Duration; import java.util.*; -import java.util.stream.*; +import java.util.stream.Collectors; @Service public final class KafkaHighLevelConsumer { @@ -47,36 +48,37 @@ private void initializeClient() { } } - synchronized Map getPartitionSize(String topic) { + synchronized void setTopicPartitionSizes(List topics) { initializeClient(); - final var partitionInfoSet = kafkaConsumer.partitionsFor(topic); - kafkaConsumer.assign(partitionInfoSet.stream() - .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), - partitionInfo.partition())) - .collect(Collectors.toList())); - - kafkaConsumer.poll(Duration.ofMillis(0)); - final Set assignedPartitionList = kafkaConsumer.assignment(); - final TopicVO topicVO = getTopicInfo(topic); - final Map partitionsVo = topicVO.getPartitionMap(); - - kafkaConsumer.seekToBeginning(assignedPartitionList); - assignedPartitionList.forEach(topicPartition -> { - final TopicPartitionVO topicPartitionVo = partitionsVo.get(topicPartition.partition()); - final long startOffset = kafkaConsumer.position(topicPartition); - LOG.debug("topic: {}, partition: {}, startOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset); - topicPartitionVo.setFirstOffset(startOffset); - }); - - kafkaConsumer.seekToEnd(assignedPartitionList); - assignedPartitionList.forEach(topicPartition -> { - final long latestOffset = kafkaConsumer.position(topicPartition); - LOG.debug("topic: {}, partition: {}, latestOffset: {}", topicPartition.topic(), topicPartition.partition(), latestOffset); - final TopicPartitionVO partitionVo = partitionsVo.get(topicPartition.partition()); - partitionVo.setSize(latestOffset); - }); - return partitionsVo; + Map> allTopics = topics.stream().map(topicVO -> { + List topicPartitions = topicVO.getPartitions().stream().map(topicPartitionVO -> + new TopicPartition(topicVO.getName(), topicPartitionVO.getId()) + ).collect(Collectors.toList()); + + return Pair.of(topicVO, topicPartitions); + }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + List allTopicPartitions = allTopics.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + + kafkaConsumer.assign(allTopicPartitions); + Map beginningOffset = kafkaConsumer.beginningOffsets(allTopicPartitions); + Map endOffsets = kafkaConsumer.endOffsets(allTopicPartitions); + + allTopics.forEach((topicVO, topicPartitions) -> topicPartitions.forEach(topicPartition -> { + Optional partition = topicVO.getPartition(topicPartition.partition()); + + partition.ifPresent(p -> { + Long startOffset = beginningOffset.get(topicPartition); + Long endOffset = endOffsets.get(topicPartition); + + LOG.debug("topic: {}, partition: {}, startOffset: {}, endOffset: {}", topicPartition.topic(), topicPartition.partition(), startOffset, endOffset); + p.setFirstOffset(startOffset); + p.setSize(endOffset); + }); + })); } /** @@ -195,25 +197,27 @@ private static String deserialize(MessageDeserializer deserializer, byte[] bytes return bytes != null ? deserializer.deserializeMessage(ByteBuffer.wrap(bytes)) : "empty"; } - synchronized Map getTopicInfos(String[] topics) { + synchronized Map> getAllTopics() { + initializeClient(); + + return kafkaConsumer.listTopics(); + } + + synchronized Map getTopicInfos(Map> allTopicsMap, String[] topics) { initializeClient(); - final var topicSet = kafkaConsumer.listTopics().keySet(); + + final var topicSet = allTopicsMap.keySet(); if (topics.length == 0) { topics = Arrays.copyOf(topicSet.toArray(), topicSet.size(), String[].class); } - final var topicVos = new HashMap(topics.length, 1f); - for (var topic : topics) { - if (topicSet.contains(topic)) { - topicVos.put(topic, getTopicInfo(topic)); - } - } - - return topicVos; - } + return Arrays.stream(topics) + .filter(topicSet::contains) + .map(topic -> Pair.of(topic, getTopicInfo(topic, allTopicsMap.get(topic)))) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } - private TopicVO getTopicInfo(String topic) { - final var partitionInfoList = kafkaConsumer.partitionsFor(topic); + private TopicVO getTopicInfo(String topic, List partitionInfoList) { final var topicVo = new TopicVO(topic); final var partitions = new TreeMap(); diff --git a/src/main/java/kafdrop/service/KafkaMonitor.java b/src/main/java/kafdrop/service/KafkaMonitor.java index d4bcc000..137ac4f3 100644 --- a/src/main/java/kafdrop/service/KafkaMonitor.java +++ b/src/main/java/kafdrop/service/KafkaMonitor.java @@ -44,7 +44,9 @@ List getMessages(TopicPartition topicPartition, long offset, int coun ClusterSummaryVO getClusterSummary(Collection topics); - List getConsumers(Collection topicVos); + List getConsumersByGroup(String groupId); + + List getConsumersByTopics(Collection topicVos); /** * Create topic diff --git a/src/main/java/kafdrop/service/KafkaMonitorImpl.java b/src/main/java/kafdrop/service/KafkaMonitorImpl.java index 79038e9b..ab87593a 100644 --- a/src/main/java/kafdrop/service/KafkaMonitorImpl.java +++ b/src/main/java/kafdrop/service/KafkaMonitorImpl.java @@ -99,24 +99,31 @@ public ClusterSummaryVO getClusterSummary(Collection topics) { @Override public List getTopics() { - final var topicVos = getTopicMetadata().values().stream() + final var topicVos = getTopicMetadata(highLevelConsumer.getAllTopics()).values().stream() .sorted(Comparator.comparing(TopicVO::getName)) .collect(Collectors.toList()); - for (var topicVo : topicVos) { - topicVo.setPartitions(getTopicPartitionSizes(topicVo)); - } + + return topicVos; + } + + public List getTopics(String[] topics) { + Map> topicsMap = highLevelConsumer.getAllTopics(); + + ArrayList topicVos = new ArrayList<>(getTopicMetadata(topicsMap, topics).values()); + setTopicPartitionSizes(topicVos); + return topicVos; } @Override public Optional getTopic(String topic) { - final var topicVo = Optional.ofNullable(getTopicMetadata(topic).get(topic)); - topicVo.ifPresent(vo -> vo.setPartitions(getTopicPartitionSizes(vo))); - return topicVo; + String[] topics = { topic }; + + return getTopics(topics).stream().findAny(); } - private Map getTopicMetadata(String... topics) { - final var topicInfos = highLevelConsumer.getTopicInfos(topics); + private Map getTopicMetadata(Map> allTopicsMap, String... topics) { + final var topicInfos = highLevelConsumer.getTopicInfos(allTopicsMap, topics); final var retrievedTopicNames = topicInfos.keySet(); final var topicConfigs = highLevelAdminClient.describeTopicConfigs(retrievedTopicNames); @@ -191,12 +198,29 @@ private static Map headersToMap(Headers headers) { return map; } - private Map getTopicPartitionSizes(TopicVO topic) { - return highLevelConsumer.getPartitionSize(topic.getName()); + private void setTopicPartitionSizes(List topics) { + highLevelConsumer.setTopicPartitionSizes(topics); } @Override - public List getConsumers(Collection topicVos) { + public List getConsumersByGroup(String groupId) { + List consumerGroupOffsets = getConsumerOffsets(groupId); + + String[] uniqueTopicNames = consumerGroupOffsets.stream() + .flatMap(consumerGroupOffset -> consumerGroupOffset.offsets.keySet() + .stream().map(TopicPartition::topic)) + .distinct() + .toArray(String[]::new); + + List topicVOs = getTopics(uniqueTopicNames); + + LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); + LOG.debug("topicVos: {}", topicVOs); + return convert(consumerGroupOffsets, topicVOs); + } + + @Override + public List getConsumersByTopics(Collection topicVos) { final var topics = topicVos.stream().map(TopicVO::getName).collect(Collectors.toSet()); final var consumerGroupOffsets = getConsumerOffsets(topics); LOG.debug("consumerGroupOffsets: {}", consumerGroupOffsets); @@ -308,6 +332,10 @@ private ConsumerGroupOffsets resolveOffsets(String groupId) { return new ConsumerGroupOffsets(groupId, highLevelAdminClient.listConsumerGroupOffsetsIfAuthorized(groupId)); } + private List getConsumerOffsets(String groupId) { + return Collections.singletonList(resolveOffsets(groupId)); + } + private List getConsumerOffsets(Set topics) { final var consumerGroups = highLevelAdminClient.listConsumerGroups(); return consumerGroups.stream()