From 600cc56b1f165b49ae482f47ffcdeb9781e20979 Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 19 Jan 2024 06:44:17 -0500 Subject: [PATCH] Map consumer group counts and Kafka conditions to cluster overview (#393) --- ui/api/consumerGroups/actions.ts | 4 +-- ui/api/consumerGroups/schema.ts | 10 +++---- ui/api/kafka/actions.ts | 6 ++++- ui/api/kafka/schema.ts | 9 +++++++ .../consumer-groups/ConsumerGroupsTable.tsx | 8 +++--- .../[groupId]/MembersTable.tsx | 20 +++++++------- .../kafka/[kafkaId]/overview/page.tsx | 27 ++++++++++++++++--- 7 files changed, 58 insertions(+), 26 deletions(-) diff --git a/ui/api/consumerGroups/actions.ts b/ui/api/consumerGroups/actions.ts index c5dd595bb..5ce5aab60 100644 --- a/ui/api/consumerGroups/actions.ts +++ b/ui/api/consumerGroups/actions.ts @@ -31,6 +31,7 @@ export async function getConsumerGroup( export async function getConsumerGroups( kafkaId: string, params: { + fields?: string; pageSize?: number; pageCursor?: string; sort?: string; @@ -39,8 +40,7 @@ export async function getConsumerGroups( ): Promise { const sp = new URLSearchParams( filterUndefinedFromObj({ - "fields[consumerGroups]": - "state,simpleConsumerGroup,members,offsets", + "fields[consumerGroups]": params.fields ?? "state,simpleConsumerGroup,members,offsets", // TODO: pass filter from UI "filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE", "page[size]": params.pageSize, diff --git a/ui/api/consumerGroups/schema.ts b/ui/api/consumerGroups/schema.ts index d4ccb96e5..f7d26d2c7 100644 --- a/ui/api/consumerGroups/schema.ts +++ b/ui/api/consumerGroups/schema.ts @@ -22,19 +22,19 @@ const MemberDescriptionSchema = z.object({ groupInstanceId: z.string().nullable().optional(), clientId: z.string(), host: z.string(), - assignments: z.array(PartitionKeySchema), + assignments: z.array(PartitionKeySchema).optional(), }); export const ConsumerGroupSchema = z.object({ id: z.string(), type: z.literal("consumerGroups"), attributes: z.object({ - simpleConsumerGroup: z.boolean(), - state: z.string(), - members: z.array(MemberDescriptionSchema), + simpleConsumerGroup: z.boolean().optional(), + state: z.string().optional(), + members: z.array(MemberDescriptionSchema).optional(), partitionAssignor: z.string().nullable().optional(), coordinator: NodeSchema.nullable().optional(), authorizedOperations: z.array(z.string()).nullable().optional(), - offsets: z.array(OffsetAndMetadataSchema), + offsets: z.array(OffsetAndMetadataSchema).optional(), errors: z.array(ApiError).optional(), }), }); diff --git a/ui/api/kafka/actions.ts b/ui/api/kafka/actions.ts index ac7489504..25528466a 100644 --- a/ui/api/kafka/actions.ts +++ b/ui/api/kafka/actions.ts @@ -49,7 +49,11 @@ export async function getKafkaClusters(): Promise { export async function getKafkaCluster( clusterId: string, ): Promise { - const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}/?fields%5Bkafkas%5D=name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,bootstrapServers,listeners,authType`; + const sp = new URLSearchParams({ + "fields[kafkas]": "name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,bootstrapServers,listeners,authType,conditions" + }); + const kafkaClusterQuery = sp.toString(); + const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`; try { const res = await fetch(url, { headers: await getHeaders(), diff --git a/ui/api/kafka/schema.ts b/ui/api/kafka/schema.ts index 5b957472a..cf54d80e2 100644 --- a/ui/api/kafka/schema.ts +++ b/ui/api/kafka/schema.ts @@ -42,6 +42,15 @@ const ClusterDetailSchema = z.object({ authType: z.string().nullable(), }), ), + conditions: z.array( + z.object({ + type: z.string().optional(), + status: z.string().optional(), + reason: z.string().optional(), + message: z.string().optional(), + lastTransitionTime: z.string().optional(), + }), + ), }), }); export const ClusterResponse = z.object({ diff --git a/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx b/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx index f92744015..c45da1089 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/ConsumerGroupsTable.tsx @@ -97,16 +97,14 @@ export function ConsumerGroupsTable({ o.lag) + ?.map((o) => o.lag) // lag values may not be available from API, e.g. when there is an error listing the topic offsets .reduce((acc, v) => (acc ?? NaN) + (v ?? NaN), 0)} /> ); case "topics": - const allTopics = row.attributes.members.flatMap((m) => - m.assignments.map((a) => a), - ); + const allTopics = row.attributes.members?.flatMap((m) => m.assignments ?? []) ?? []; return ( @@ -129,7 +127,7 @@ export function ConsumerGroupsTable({ case "members": return ( - + ); } diff --git a/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/[groupId]/MembersTable.tsx b/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/[groupId]/MembersTable.tsx index dd1d2dc8c..e4890a734 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/[groupId]/MembersTable.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/consumer-groups/[groupId]/MembersTable.tsx @@ -29,17 +29,17 @@ export function MembersTable({ let members: ConsumerGroup["attributes"]["members"] | undefined = undefined; if (consumerGroup) { - if (consumerGroup.attributes.members.length === 0) { - members = consumerGroup.attributes.offsets.map((o) => ({ + if (consumerGroup.attributes.members?.length === 0) { + members = ([{ memberId: "unknown", host: "N/A", clientId: "unknown", - assignments: consumerGroup.attributes.offsets.map((o) => ({ + assignments: consumerGroup.attributes.offsets?.map((o) => ({ topicId: o.topicId, topicName: o.topicName, partition: o.partition, })), - })); + }]); } else { members = consumerGroup.attributes.members; } @@ -87,12 +87,12 @@ export function MembersTable({ ); case "overallLag": - const topics = row.assignments.map((a) => a.topicId); + const topics = row.assignments?.map((a) => a.topicId); return ( topics.includes(o.topicId)) + ?.filter((o) => topics?.includes(o.topicId)) .map((o) => o.lag) // lag values may not be available from API, e.g. when there is an error listing the topic offsets .reduce((acc, v) => (acc ?? NaN) + (v ?? NaN), 0)} @@ -102,7 +102,7 @@ export function MembersTable({ case "assignedPartitions": return ( - + ); } @@ -112,13 +112,13 @@ export function MembersTable({ }} getExpandedRow={({ row }) => { const offsets: ConsumerGroup["attributes"]["offsets"] = - row.assignments.map((a) => ({ + row.assignments?.map((a) => ({ ...a, - ...consumerGroup!.attributes.offsets.find( + ...consumerGroup!.attributes.offsets?.find( (o) => o.topicId === a.topicId && o.partition === a.partition, )!, })); - offsets.sort((a, b) => a.topicName.localeCompare(b.topicName)); + offsets?.sort((a, b) => a.topicName.localeCompare(b.topicName)); return (
diff --git a/ui/app/[locale]/kafka/[kafkaId]/overview/page.tsx b/ui/app/[locale]/kafka/[kafkaId]/overview/page.tsx index 004d4d9d1..92004a34c 100644 --- a/ui/app/[locale]/kafka/[kafkaId]/overview/page.tsx +++ b/ui/app/[locale]/kafka/[kafkaId]/overview/page.tsx @@ -1,3 +1,5 @@ +import { ConsumerGroupsResponse } from "@/api/consumerGroups/schema"; +import { getConsumerGroups } from "@/api/consumerGroups/actions"; import { ClusterMetric, getKafkaClusterKpis, @@ -25,9 +27,10 @@ export default function OverviewPage({ params }: { params: KafkaParams }) { "outgoingByteRate", "incomingByteRate", ]); + const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" }); return ( } + clusterOverview={} topicsPartitions={} clusterCharts={} topicCharts={} @@ -37,21 +40,39 @@ export default function OverviewPage({ params }: { params: KafkaParams }) { async function ConnectedClusterCard({ data, + consumerGroups, }: { data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis } | null>; + consumerGroups: Promise; }) { const res = await data; + const groupCount = await consumerGroups.then(grpResp => grpResp.meta.page.total ?? 0); const brokersTotal = Object.keys(res?.kpis.broker_state || {}).length; const brokersOnline = Object.values(res?.kpis.broker_state || {}).filter((s) => s === 3).length || 0; + const messages = res?.cluster + .attributes + .conditions + ?.filter((c) => "Ready" !== c.type) + .map((c) => ({ + variant: c.type === "Error" ? "danger" : "warning" as ("danger" | "warning"), + subject: { + type: "cluster" as ("cluster" | "broker" | "topic"), + name: res?.cluster.attributes.name ?? "", + id: res?.cluster.id ?? "", + }, + message: c.message ?? "", + date: c.lastTransitionTime ?? "" + })); + return (