Skip to content

Commit

Permalink
Map consumer group counts and Kafka conditions to cluster overview (#393
Browse files Browse the repository at this point in the history
)
  • Loading branch information
MikeEdgar authored Jan 19, 2024
1 parent 1790a8b commit 600cc56
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 26 deletions.
4 changes: 2 additions & 2 deletions ui/api/consumerGroups/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export async function getConsumerGroup(
export async function getConsumerGroups(
kafkaId: string,
params: {
fields?: string;
pageSize?: number;
pageCursor?: string;
sort?: string;
Expand All @@ -39,8 +40,7 @@ export async function getConsumerGroups(
): Promise<ConsumerGroupsResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[consumerGroups]":
"state,simpleConsumerGroup,members,offsets",
"fields[consumerGroups]": params.fields ?? "state,simpleConsumerGroup,members,offsets",
// TODO: pass filter from UI
"filter[state]": "in,STABLE,PREPARING_REBALANCE,COMPLETING_REBALANCE",
"page[size]": params.pageSize,
Expand Down
10 changes: 5 additions & 5 deletions ui/api/consumerGroups/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ const MemberDescriptionSchema = z.object({
groupInstanceId: z.string().nullable().optional(),
clientId: z.string(),
host: z.string(),
assignments: z.array(PartitionKeySchema),
assignments: z.array(PartitionKeySchema).optional(),
});
export const ConsumerGroupSchema = z.object({
id: z.string(),
type: z.literal("consumerGroups"),
attributes: z.object({
simpleConsumerGroup: z.boolean(),
state: z.string(),
members: z.array(MemberDescriptionSchema),
simpleConsumerGroup: z.boolean().optional(),
state: z.string().optional(),
members: z.array(MemberDescriptionSchema).optional(),
partitionAssignor: z.string().nullable().optional(),
coordinator: NodeSchema.nullable().optional(),
authorizedOperations: z.array(z.string()).nullable().optional(),
offsets: z.array(OffsetAndMetadataSchema),
offsets: z.array(OffsetAndMetadataSchema).optional(),
errors: z.array(ApiError).optional(),
}),
});
Expand Down
6 changes: 5 additions & 1 deletion ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ export async function getKafkaClusters(): Promise<ClusterList[]> {
export async function getKafkaCluster(
clusterId: string,
): Promise<ClusterDetail | null> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}/?fields%5Bkafkas%5D=name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,bootstrapServers,listeners,authType`;
const sp = new URLSearchParams({
"fields[kafkas]": "name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,bootstrapServers,listeners,authType,conditions"
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
try {
const res = await fetch(url, {
headers: await getHeaders(),
Expand Down
9 changes: 9 additions & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ const ClusterDetailSchema = z.object({
authType: z.string().nullable(),
}),
),
conditions: z.array(
z.object({
type: z.string().optional(),
status: z.string().optional(),
reason: z.string().optional(),
message: z.string().optional(),
lastTransitionTime: z.string().optional(),
}),
),
}),
});
export const ClusterResponse = z.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,14 @@ export function ConsumerGroupsTable({
<Td key={key} dataLabel={"Overall lag"}>
<Number
value={row.attributes.offsets
.map((o) => o.lag)
?.map((o) => o.lag)
// lag values may not be available from API, e.g. when there is an error listing the topic offsets
.reduce((acc, v) => (acc ?? NaN) + (v ?? NaN), 0)}
/>
</Td>
);
case "topics":
const allTopics = row.attributes.members.flatMap((m) =>
m.assignments.map((a) => a),
);
const allTopics = row.attributes.members?.flatMap((m) => m.assignments ?? []) ?? [];
return (
<Td key={key} dataLabel={"Assigned topics"}>
<LabelGroup>
Expand All @@ -129,7 +127,7 @@ export function ConsumerGroupsTable({
case "members":
return (
<Td key={key} dataLabel={"Members"}>
<Number value={row.attributes.members.length} />
<Number value={row.attributes.members?.length} />
</Td>
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ export function MembersTable({
let members: ConsumerGroup["attributes"]["members"] | undefined = undefined;

if (consumerGroup) {
if (consumerGroup.attributes.members.length === 0) {
members = consumerGroup.attributes.offsets.map((o) => ({
if (consumerGroup.attributes.members?.length === 0) {
members = ([{
memberId: "unknown",
host: "N/A",
clientId: "unknown",
assignments: consumerGroup.attributes.offsets.map((o) => ({
assignments: consumerGroup.attributes.offsets?.map((o) => ({
topicId: o.topicId,
topicName: o.topicName,
partition: o.partition,
})),
}));
}]);
} else {
members = consumerGroup.attributes.members;
}
Expand Down Expand Up @@ -87,12 +87,12 @@ export function MembersTable({
</Td>
);
case "overallLag":
const topics = row.assignments.map((a) => a.topicId);
const topics = row.assignments?.map((a) => a.topicId);
return (
<Td key={key} dataLabel={"Overall lag"}>
<Number
value={consumerGroup!.attributes.offsets
.filter((o) => topics.includes(o.topicId))
?.filter((o) => topics?.includes(o.topicId))
.map((o) => o.lag)
// lag values may not be available from API, e.g. when there is an error listing the topic offsets
.reduce((acc, v) => (acc ?? NaN) + (v ?? NaN), 0)}
Expand All @@ -102,7 +102,7 @@ export function MembersTable({
case "assignedPartitions":
return (
<Td key={key} dataLabel={"Assigned partitions"}>
<Number value={row.assignments.length} />
<Number value={row.assignments?.length} />
</Td>
);
}
Expand All @@ -112,13 +112,13 @@ export function MembersTable({
}}
getExpandedRow={({ row }) => {
const offsets: ConsumerGroup["attributes"]["offsets"] =
row.assignments.map((a) => ({
row.assignments?.map((a) => ({
...a,
...consumerGroup!.attributes.offsets.find(
...consumerGroup!.attributes.offsets?.find(
(o) => o.topicId === a.topicId && o.partition === a.partition,
)!,
}));
offsets.sort((a, b) => a.topicName.localeCompare(b.topicName));
offsets?.sort((a, b) => a.topicName.localeCompare(b.topicName));
return (
<div className={"pf-v5-u-p-lg"}>
<LagTable kafkaId={kafkaId} offsets={offsets} />
Expand Down
27 changes: 24 additions & 3 deletions ui/app/[locale]/kafka/[kafkaId]/overview/page.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { ConsumerGroupsResponse } from "@/api/consumerGroups/schema";
import { getConsumerGroups } from "@/api/consumerGroups/actions";
import {
ClusterMetric,
getKafkaClusterKpis,
Expand Down Expand Up @@ -25,9 +27,10 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {
"outgoingByteRate",
"incomingByteRate",
]);
const consumerGroups = getConsumerGroups(params.kafkaId, { fields: "state" });
return (
<PageLayout
clusterOverview={<ConnectedClusterCard data={kpi} />}
clusterOverview={<ConnectedClusterCard data={kpi} consumerGroups={consumerGroups} />}
topicsPartitions={<ConnectedTopicsPartitionsCard data={kpi} />}
clusterCharts={<ConnectedClusterChartsCard data={cluster} />}
topicCharts={<ConnectedTopicChartsCard data={topic} />}
Expand All @@ -37,21 +40,39 @@ export default function OverviewPage({ params }: { params: KafkaParams }) {

async function ConnectedClusterCard({
data,
consumerGroups,
}: {
data: Promise<{ cluster: ClusterDetail; kpis: ClusterKpis } | null>;
consumerGroups: Promise<ConsumerGroupsResponse>;
}) {
const res = await data;
const groupCount = await consumerGroups.then(grpResp => grpResp.meta.page.total ?? 0);
const brokersTotal = Object.keys(res?.kpis.broker_state || {}).length;
const brokersOnline =
Object.values(res?.kpis.broker_state || {}).filter((s) => s === 3).length ||
0;
const messages = res?.cluster
.attributes
.conditions
?.filter((c) => "Ready" !== c.type)
.map((c) => ({
variant: c.type === "Error" ? "danger" : "warning" as ("danger" | "warning"),
subject: {
type: "cluster" as ("cluster" | "broker" | "topic"),
name: res?.cluster.attributes.name ?? "",
id: res?.cluster.id ?? "",
},
message: c.message ?? "",
date: c.lastTransitionTime ?? ""
}));

return (
<ClusterCard
isLoading={false}
status={res?.cluster.attributes.status || "n/a"}
messages={[]}
messages={messages ?? []}
name={res?.cluster.attributes.name || "n/a"}
consumerGroups={0}
consumerGroups={groupCount}
brokersOnline={brokersOnline}
brokersTotal={brokersTotal}
kafkaVersion={res?.cluster.attributes.kafkaVersion || "n/a"}
Expand Down

0 comments on commit 600cc56

Please sign in to comment.