Skip to content

Commit

Permalink
Rebalance list
Browse files Browse the repository at this point in the history
Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg committed Oct 3, 2024
1 parent 32f9a4a commit 465f013
Show file tree
Hide file tree
Showing 21 changed files with 1,579 additions and 7 deletions.
2 changes: 1 addition & 1 deletion ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function getKafkaCluster(
): Promise<ClusterDetail | null> {
const sp = new URLSearchParams({
"fields[kafkas]":
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools",
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled",
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
Expand Down
1 change: 1 addition & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const ClusterDetailSchema = z.object({
nodes: z.array(NodeSchema),
controller: NodeSchema,
authorizedOperations: z.array(z.string()),
cruiseControlEnabled: z.boolean().optional(),
listeners: z
.array(
z.object({
Expand Down
88 changes: 88 additions & 0 deletions ui/api/rebalance/actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"use server";
import { logger } from "@/utils/logger";
import {
RebalanceResponse,
RebalanceResponseSchema,
RebalanceSchema,
RebalancesResponse,
RebalanceStatus,
} from "./schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { getHeaders } from "@/api/api";

const log = logger.child({ module: "rebalance-api" });

export async function getRebalancesList(
kafkaId: string,
params: {
name?: string;
mode?: string;
status?: RebalanceStatus[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<RebalancesResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[kafkaRebalances]":
"name,namespace,creationTimestamp,status,mode,brokers",
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
params.status && params.status.length > 0
? `in,${params.status.join(",")}`
: undefined,
"filter[mode]": params.mode ? `like,*${params.mode}*` : undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const rebalanceQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances?${rebalanceQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: ["rebalances"],
},
});

log.debug({ url }, "getRebalanceList");
const rawData = await res.json();
log.trace({ url, rawData }, "getRebalanceList response");
return RebalanceResponseSchema.parse(rawData);
}

export async function getRebalanceDetails(
kafkaId: string,
rebalanceId: string,
action?: string,
): Promise<RebalanceResponse | boolean> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances/${rebalanceId}`;
const decodedRebalanceId = decodeURIComponent(rebalanceId);
const body = {
data: {
type: "kafkaRebalances",
id: decodedRebalanceId,
meta: {
action: action,
},
attributes: {},
},
};
log.debug({ url }, "Fetching rebalance details");
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
if (action) {
return res.ok;
} else {
const rawData = await res.json();
return RebalanceSchema.parse(rawData.data);
}
}
93 changes: 93 additions & 0 deletions ui/api/rebalance/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { z } from "zod";

const RebalanceStatusSchema = z.union([
z.literal("New"),
z.literal("PendingProposal"),
z.literal("ProposalReady"),
z.literal("Rebalancing"),
z.literal("Stopped"),
z.literal("NotReady"),
z.literal("Ready"),
z.literal("ReconciliationPaused"),
]);

const OptimizationResultSchema = z.object({
numIntraBrokerReplicaMovements: z.number(),
numReplicaMovements: z.number(),
onDemandBalancednessScoreAfter: z.number(),
afterBeforeLoadConfigMap: z.string(),
intraBrokerDataToMoveMB: z.number(),
monitoredPartitionsPercentage: z.number(),
provisionRecommendation: z.string(),
excludedBrokersForReplicaMove: z.array(z.string()).nullable(),
excludedBrokersForLeadership: z.array(z.string()).nullable(),
provisionStatus: z.string(),
onDemandBalancednessScoreBefore: z.number(),
recentWindows: z.number(),
dataToMoveMB: z.number(),
excludedTopics: z.array(z.string()).nullable(),
numLeaderMovements: z.number(),
});

export const RebalanceSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z
.object({
autoApproval: z.boolean().optional(),
allowedActions: z.array(z.string()),
})
.optional(),
attributes: z.object({
name: z.string(),
namespace: z.string(),
creationTimestamp: z.string(),
status: RebalanceStatusSchema,
mode: z.string().optional(),
brokers: z.array(z.number()).nullable(),
sessionId: z.string().nullable(),
optimizationResult: OptimizationResultSchema,
}),
});

const RebalancesListSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z.object({
page: z.object({
cursor: z.string(),
}),
autoApproval: z.boolean(),
managed: z.boolean().optional(),
allowedActions: z.array(z.string()),
}),
attributes: RebalanceSchema.shape.attributes.pick({
name: true,
status: true,
creationTimestamp: true,
mode: true,
brokers: true,
}),
});

export const RebalanceResponseSchema = z.object({
meta: z.object({
page: z.object({
total: z.number(),
pageNumber: z.number().optional(),
}),
}),
links: z.object({
first: z.string().nullable(),
prev: z.string().nullable(),
next: z.string().nullable(),
last: z.string().nullable(),
}),
data: z.array(RebalancesListSchema),
});
export type RebalanceList = z.infer<typeof RebalancesListSchema>;
export type RebalancesResponse = z.infer<typeof RebalanceResponseSchema>;

export type RebalanceResponse = z.infer<typeof RebalanceSchema>;

export type RebalanceStatus = z.infer<typeof RebalanceStatusSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,49 @@ import { getKafkaCluster } from "@/api/kafka/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { AppHeader } from "@/components/AppHeader";
import { Number } from "@/components/Format/Number";
import { Label, Spinner, Split, SplitItem } from "@/libs/patternfly/react-core";
import { NavItemLink } from "@/components/Navigation/NavItemLink";
import {
Label,
Nav,
NavList,
PageNavigation,
Spinner,
Split,
SplitItem,
} from "@/libs/patternfly/react-core";
import { CheckCircleIcon } from "@/libs/patternfly/react-icons";
import { Suspense } from "react";

export default function NodesHeader({ params }: { params: KafkaParams }) {
return (
<Suspense fallback={<Header />}>
<Suspense
fallback={<Header kafkaId={undefined} cruiseControlEnable={false} />}
>
<ConnectedHeader params={params} />
</Suspense>
);
}

async function ConnectedHeader({ params }: { params: KafkaParams }) {
const cluster = await getKafkaCluster(params.kafkaId);
return <Header total={cluster?.attributes.nodes.length || 0} />;
return (
<Header
total={cluster?.attributes.nodes.length || 0}
kafkaId={cluster?.id}
cruiseControlEnable={cluster?.attributes.cruiseControlEnabled || false}
/>
);
}

function Header({ total }: { total?: number }) {
function Header({
total,
kafkaId,
cruiseControlEnable,
}: {
total?: number;
kafkaId: string | undefined;
cruiseControlEnable: boolean;
}) {
return (
<AppHeader
title={
Expand All @@ -41,6 +66,22 @@ function Header({ total }: { total?: number }) {
</SplitItem>
</Split>
}
navigation={
<PageNavigation>
<Nav aria-label="Node navigation" variant="tertiary">
<NavList>
<NavItemLink url={`/kafka/${kafkaId}/nodes`}>
Overview
</NavItemLink>
{cruiseControlEnable && (
<NavItemLink url={`/kafka/${kafkaId}/nodes/rebalances`}>
Rebalance
</NavItemLink>
)}
</NavList>
</Nav>
</PageNavigation>
}
/>
);
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import { TopicHeader } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader";
import {
TopicHeader,
TopicHeaderProps,
} from "@/app/[locale]/(authorized)/kafka/[kafkaId]/@header/topics/[topicId]/TopicHeader";

export default TopicHeader;
export default function TopicHeaderNoRefresh(
props: Omit<TopicHeaderProps, "showRefresh">,
) {
return <TopicHeader {...props} showRefresh={false} />;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export default function NodesPage({ params }: { params: KafkaParams }) {
async function ConnectedNodes({ params }: { params: KafkaParams }) {
const t = await getTranslations();
const res = await getKafkaClusterKpis(params.kafkaId);

let { cluster, kpis } = res || {};

const nodes: Node[] = (cluster?.attributes.nodes || []).map((node) => {
Expand Down
Loading

0 comments on commit 465f013

Please sign in to comment.