From 7ca2a074d9ff27b531e8e0193e67c89393d45419 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 18 Oct 2024 11:18:21 -0400 Subject: [PATCH] Obtain topic/partition overview counts from topics API, not metrics Signed-off-by: Michael Edgar --- .../streamshub/console/api/model/Topic.java | 2 +- .../console/api/service/TopicService.java | 12 ++++-- ui/api/kafka/actions.ts | 19 +++++---- ui/api/kafka/kpi.promql.ts | 42 ------------------- ui/api/kafka/schema.ts | 3 -- ui/api/topics/actions.ts | 6 ++- ui/api/topics/schema.ts | 15 ++++--- .../@header/topics/[topicId]/TopicHeader.tsx | 2 +- .../ConnectedTopicsPartitionsCard.tsx | 25 ++++++----- .../kafka/[kafkaId]/overview/page.tsx | 5 ++- .../topics/[topicId]/messages/page.tsx | 2 +- .../ClusterOverview/TopicsPartitionsCard.tsx | 12 +++--- ui/components/TopicsTable/TopicsTable.tsx | 6 +-- ui/messages/en.json | 1 + 14 files changed, 62 insertions(+), 90 deletions(-) diff --git a/api/src/main/java/com/github/streamshub/console/api/model/Topic.java b/api/src/main/java/com/github/streamshub/console/api/model/Topic.java index ff9988c7a..113cf298a 100644 --- a/api/src/main/java/com/github/streamshub/console/api/model/Topic.java +++ b/api/src/main/java/com/github/streamshub/console/api/model/Topic.java @@ -127,7 +127,7 @@ public SingleResponse(Topic data) { } @JsonFilter("fieldFilter") - static class Attributes { + public static class Attributes { @JsonProperty String name; diff --git a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java index b431d3fd8..2f4882688 100644 --- a/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java +++ b/api/src/main/java/com/github/streamshub/console/api/service/TopicService.java @@ -14,6 +14,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Pattern; @@ -163,7 +164,11 @@ public CompletionStage> listTopics(List fields, String offse Admin adminClient = kafkaContext.admin(); final Map statuses = new HashMap<>(); - listSupport.meta().put("summary", Map.of("statuses", statuses)); + final AtomicInteger partitionCount = new AtomicInteger(0); + + listSupport.meta().put("summary", Map.of( + "statuses", statuses, + "totalPartitions", partitionCount)); return listTopics(adminClient, true) .thenApply(list -> list.stream().map(Topic::fromTopicListing).toList()) @@ -172,7 +177,7 @@ public CompletionStage> listTopics(List fields, String offse threadContext.currentContextExecutor()) .thenApply(list -> list.stream() .filter(listSupport) - .map(topic -> tallyStatus(statuses, topic)) + .map(topic -> tallySummary(statuses, partitionCount, topic)) .map(listSupport::tally) .filter(listSupport::betweenCursors) .sorted(listSupport.getSortComparator()) @@ -183,8 +188,9 @@ public CompletionStage> listTopics(List fields, String offse threadContext.currentContextExecutor()); } - Topic tallyStatus(Map statuses, Topic topic) { + Topic tallySummary(Map statuses, AtomicInteger partitionCount, Topic topic) { statuses.compute(topic.status(), (k, v) -> v == null ? 1 : v + 1); + partitionCount.addAndGet(topic.getAttributes().numPartitions()); return topic; } diff --git a/ui/api/kafka/actions.ts b/ui/api/kafka/actions.ts index bdac85f4a..916c54d63 100644 --- a/ui/api/kafka/actions.ts +++ b/ui/api/kafka/actions.ts @@ -89,7 +89,10 @@ export async function getKafkaClusterKpis( } if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable"); + log.warn({ clusterId }, "getKafkaClusterKpis: " + + (!cluster.attributes.namespace + ? "Kafka cluster namespace not available" + : "Prometheus not configured or client error")); return { cluster, kpis: null }; } @@ -98,7 +101,6 @@ export async function getKafkaClusterKpis( values( cluster.attributes.namespace, cluster.attributes.name, - cluster.attributes.controller.id, cluster.attributes.nodePools?.join("|") ?? "", ), ); @@ -155,9 +157,6 @@ export async function getKafkaClusterKpis( "1": 3, "2": 3 }, - "total_topics": 5, - "total_partitions": 55, - "underreplicated_topics": 0, "replica_count": { "byNode": { "0": 57, @@ -253,7 +252,10 @@ export async function getKafkaClusterMetrics( } if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable"); + log.warn({ clusterId }, "getKafkaClusterMetrics: " + + (!cluster.attributes.namespace + ? "Kafka cluster namespace not available" + : "Prometheus not configured or client error")); return { cluster, ranges: null }; } @@ -328,7 +330,10 @@ export async function getKafkaTopicMetrics( try { if (!prom || !cluster.attributes.namespace) { - log.warn({ clusterId }, "getKafkaClusterKpis Prometheus unavailable"); + log.warn({ clusterId }, "getKafkaTopicMetrics: " + + (!cluster.attributes.namespace + ? "Kafka cluster namespace not available" + : "Prometheus not configured or client error")); return { cluster, ranges: null }; } diff --git a/ui/api/kafka/kpi.promql.ts b/ui/api/kafka/kpi.promql.ts index 14764f96b..b935f1f52 100644 --- a/ui/api/kafka/kpi.promql.ts +++ b/ui/api/kafka/kpi.promql.ts @@ -1,7 +1,6 @@ export const values = ( namespace: string, cluster: string, - controller: number, nodePools: string, ) => ` sum by (__console_metric_name__, nodeId) ( @@ -22,47 +21,6 @@ sum by (__console_metric_name__, nodeId) ( or -sum by (__console_metric_name__) ( - label_replace( - kafka_controller_kafkacontroller_globaltopiccount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0, - "__console_metric_name__", - "total_topics", - "", - "" - ) -) - -or - -sum by (__console_metric_name__) ( - label_replace( - kafka_controller_kafkacontroller_globalpartitioncount{namespace="${namespace}",pod=~"${cluster}-.+-${controller}",strimzi_io_kind="Kafka"} > 0, - "__console_metric_name__", - "total_partitions", - "", - "" - ) -) - -or - -label_replace( - ( - count( - sum by (topic) ( - kafka_cluster_partition_underreplicated{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"} > 0 - ) - ) - OR on() vector(0) - ), - "__console_metric_name__", - "underreplicated_topics", - "", - "" -) - -or - sum by (__console_metric_name__, nodeId) ( label_replace( label_replace( diff --git a/ui/api/kafka/schema.ts b/ui/api/kafka/schema.ts index 7f97f96cf..95e430ee0 100644 --- a/ui/api/kafka/schema.ts +++ b/ui/api/kafka/schema.ts @@ -83,9 +83,6 @@ export type ClusterDetail = z.infer; export const ClusterKpisSchema = z.object({ broker_state: z.record(z.number()).optional(), - total_topics: z.number().optional(), - total_partitions: z.number().optional(), - underreplicated_topics: z.number().optional(), replica_count: z .object({ byNode: z.record(z.number()).optional(), diff --git a/ui/api/topics/actions.ts b/ui/api/topics/actions.ts index 945c5779a..70f0bfdcb 100644 --- a/ui/api/topics/actions.ts +++ b/ui/api/topics/actions.ts @@ -25,6 +25,7 @@ export async function getTopics( params: { name?: string; id?: string; + fields?: string; status?: TopicStatus[]; pageSize?: number; pageCursor?: string; @@ -36,7 +37,7 @@ export async function getTopics( const sp = new URLSearchParams( filterUndefinedFromObj({ "fields[topics]": - "name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups", + params.fields ?? "name,status,visibility,numPartitions,totalLeaderLogBytes,consumerGroups", "filter[id]": params.id ? `eq,${params.id}` : undefined, "filter[name]": params.name ? `like,*${params.name}*` : undefined, "filter[status]": @@ -210,7 +211,8 @@ export async function setTopicAsViewed(kafkaId: string, topicId: string) { kafkaId, kafkaName: cluster.attributes.name, topicId, - topicName: topic.attributes.name, + // name is included in the `fields[topics]` param list so we are sure it is present + topicName: topic.attributes.name!, }; if (viewedTopics.find((t) => t.topicId === viewedTopic.topicId)) { log.trace( diff --git a/ui/api/topics/schema.ts b/ui/api/topics/schema.ts index 1a1ef2081..ac8ebe79a 100644 --- a/ui/api/topics/schema.ts +++ b/ui/api/topics/schema.ts @@ -74,9 +74,9 @@ const TopicSchema = z.object({ managed: z.boolean().optional(), }).optional(), attributes: z.object({ - name: z.string(), - status: TopicStatusSchema, - visibility: z.string(), + name: z.string().optional(), + status: TopicStatusSchema.optional(), + visibility: z.string().optional(), partitions: z.array(PartitionSchema).optional(), numPartitions: z.number().optional(), authorizedOperations: z.array(z.string()), @@ -86,7 +86,7 @@ const TopicSchema = z.object({ relationships: z.object({ consumerGroups: z.object({ data: z.array(z.any()), - }), + }).optional(), }), }); export const TopicResponse = z.object({ @@ -109,10 +109,8 @@ const TopicListSchema = z.object({ numPartitions: true, totalLeaderLogBytes: true, }), - relationships: z.object({ - consumerGroups: z.object({ - data: z.array(z.any()), - }), + relationships: TopicSchema.shape.relationships.pick({ + consumerGroups: true }), }); export type TopicList = z.infer; @@ -129,6 +127,7 @@ export const TopicsResponseSchema = z.object({ PartiallyOffline: z.number().optional(), Offline: z.number().optional(), }), + totalPartitions: z.number(), }), }), links: z.object({ diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader.tsx index ce7796acc..9f789a4c1 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader.tsx @@ -135,7 +135,7 @@ async function ConnectedTopicHeader({ Consumer groups  diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicsPartitionsCard.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicsPartitionsCard.tsx index a0257fe81..4b8b6edea 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicsPartitionsCard.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedTopicsPartitionsCard.tsx @@ -1,24 +1,29 @@ -import { ClusterDetail, ClusterKpis } from "@/api/kafka/schema"; +import { TopicsResponse } from "@/api/topics/schema"; import { TopicsPartitionsCard } from "@/components/ClusterOverview/TopicsPartitionsCard"; export async function ConnectedTopicsPartitionsCard({ data, }: { - data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis | null } | null>; + data: Promise; }) { - const res = await data; - if (!res?.kpis) { + const summary = (await data).meta.summary; + + if (!summary) { return null; } - const topicsTotal = res?.kpis.total_topics || 0; - const topicsUnderreplicated = res?.kpis.underreplicated_topics || 0; + + const totalPartitions = summary.totalPartitions; + const totalReplicated = summary.statuses.FullyReplicated ?? 0; + const totalUnderReplicated = (summary.statuses.UnderReplicated ?? 0) + (summary.statuses.PartiallyOffline ?? 0); + const totalOffline = summary.statuses.Offline ?? 0; + return ( ); } diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx index 62510c0fa..8753195b3 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/overview/page.tsx @@ -4,7 +4,7 @@ import { getKafkaClusterMetrics, getKafkaTopicMetrics, } from "@/api/kafka/actions"; -import { getViewedTopics } from "@/api/topics/actions"; +import { getTopics, getViewedTopics } from "@/api/topics/actions"; import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params"; import { ConnectedClusterCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterCard"; import { ConnectedClusterChartsCard } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/overview/ConnectedClusterChartsCard"; @@ -25,6 +25,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) { "outgoingByteRate", "incomingByteRate", ]); + const topics = getTopics(params.kafkaId, { fields: "status", pageSize: 1 }); const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" }); const viewedTopics = getViewedTopics().then((topics) => topics.filter((t) => t.kafkaId === params.kafkaId), @@ -34,7 +35,7 @@ export default function OverviewPage({ params }: { params: KafkaParams }) { clusterOverview={ } - topicsPartitions={} + topicsPartitions={} clusterCharts={} topicCharts={} recentTopics={} diff --git a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx index aa4af4eaf..b663d87f7 100644 --- a/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx +++ b/ui/app/[locale]/(authorized)/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx @@ -34,7 +34,7 @@ export default async function ConnectedMessagesPage({ diff --git a/ui/components/ClusterOverview/TopicsPartitionsCard.tsx b/ui/components/ClusterOverview/TopicsPartitionsCard.tsx index d87a26117..18eedf7a1 100644 --- a/ui/components/ClusterOverview/TopicsPartitionsCard.tsx +++ b/ui/components/ClusterOverview/TopicsPartitionsCard.tsx @@ -23,17 +23,17 @@ import { Link } from "@/i18n/routing"; import { useTranslations } from "next-intl"; type TopicsPartitionsCardProps = { - topicsTotal: number; topicsReplicated: number; topicsUnderReplicated: number; + topicsOffline: number; partitions: number; }; export function TopicsPartitionsCard({ isLoading, - topicsTotal, topicsReplicated, topicsUnderReplicated, + topicsOffline, partitions, }: | ({ isLoading: false } & TopicsPartitionsCardProps) @@ -70,7 +70,7 @@ export function TopicsPartitionsCard({ - {" "} + {" "} {t("ClusterOverview.total_topics")} @@ -81,7 +81,7 @@ export function TopicsPartitionsCard({ -   {t("ClusterOverview.partition")} +   {t("ClusterOverview.total_partitions")} @@ -163,9 +163,7 @@ export function TopicsPartitionsCard({ /> ) : ( )} diff --git a/ui/components/TopicsTable/TopicsTable.tsx b/ui/components/TopicsTable/TopicsTable.tsx index 7882fcd6e..c6684e7c7 100644 --- a/ui/components/TopicsTable/TopicsTable.tsx +++ b/ui/components/TopicsTable/TopicsTable.tsx @@ -184,7 +184,7 @@ export function TopicsTable({ return ( - + {row.meta?.managed === true && } @@ -192,7 +192,7 @@ export function TopicsTable({ case "status": return ( - {StatusLabel[row.attributes.status]} + {StatusLabel[row.attributes.status!]} ); case "consumerGroups": @@ -203,7 +203,7 @@ export function TopicsTable({ href={`${baseurl}/${row.id}/consumer-groups`} > diff --git a/ui/messages/en.json b/ui/messages/en.json index 7e5447255..f05493c57 100644 --- a/ui/messages/en.json +++ b/ui/messages/en.json @@ -494,6 +494,7 @@ "topic_header": "Topics", "total_topics": "total topics", "partition": "Partition", + "total_partitions": "total partitions", "view_all_topics": "View all", "fully_replicated_partition": "Fully replicated", "fully_replicated_partition_tooltip": "All partitions are fully replicated. A partition is fully-replicated when its replicas (followers) are 'in sync' with the designated partition leader. Replicas are 'in sync' if they have fetched records up to the log end offset of the leader partition within an allowable lag time, as determined by replica.lag.time.max.ms.",