Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kafka Rebalance list #1070

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function getKafkaCluster(
): Promise<ClusterDetail | null> {
const sp = new URLSearchParams({
"fields[kafkas]":
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools",
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled",
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
Expand Down
1 change: 1 addition & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const ClusterDetailSchema = z.object({
nodes: z.array(NodeSchema),
controller: NodeSchema,
authorizedOperations: z.array(z.string()),
cruiseControlEnabled: z.boolean().optional(),
listeners: z
.array(
z.object({
Expand Down
92 changes: 92 additions & 0 deletions ui/api/rebalance/actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"use server";
import { logger } from "@/utils/logger";
import {
RebalanceResponse,
RebalanceResponseSchema,
RebalanceSchema,
RebalancesResponse,
RebalanceStatus,
} from "./schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { getHeaders } from "@/api/api";
import { RebalanceMode } from "./schema";

const log = logger.child({ module: "rebalance-api" });

export async function getRebalancesList(
kafkaId: string,
params: {
name?: string;
mode?: RebalanceMode[];
status?: RebalanceStatus[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<RebalancesResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[kafkaRebalances]":
"name,namespace,creationTimestamp,status,mode,brokers,optimizationResult",
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
params.status && params.status.length > 0
? `in,${params.status.join(",")}`
: undefined,
"filter[mode]":
params.mode && params.mode.length > 0
? `in,${params.mode.join(",")}`
: undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const rebalanceQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances?${rebalanceQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: ["rebalances"],
},
});

log.debug({ url }, "getRebalanceList");
const rawData = await res.json();
log.trace({ url, rawData }, "getRebalanceList response");
return RebalanceResponseSchema.parse(rawData);
}

export async function getRebalanceDetails(
kafkaId: string,
rebalanceId: string,
action?: string,
): Promise<RebalanceResponse | boolean> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances/${rebalanceId}`;
const decodedRebalanceId = decodeURIComponent(rebalanceId);
const body = {
data: {
type: "kafkaRebalances",
id: decodedRebalanceId,
meta: {
action: action,
},
attributes: {},
},
};
log.debug({ url }, "Fetching rebalance details");
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
if (action) {
return res.ok;
} else {
const rawData = await res.json();
return RebalanceSchema.parse(rawData.data);
}
}
102 changes: 102 additions & 0 deletions ui/api/rebalance/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { z } from "zod";

const RebalanceStatusSchema = z.union([
z.literal("New"),
z.literal("PendingProposal"),
z.literal("ProposalReady"),
z.literal("Rebalancing"),
z.literal("Stopped"),
z.literal("NotReady"),
z.literal("Ready"),
z.literal("ReconciliationPaused"),
]);

const ModeSchema = z.union([
z.literal("full"),
z.literal("add-brokers"),
z.literal("remove-brokers"),
]);

const OptimizationResultSchema = z.object({
numIntraBrokerReplicaMovements: z.number().optional(),
numReplicaMovements: z.number().optional(),
onDemandBalancednessScoreAfter: z.number().optional(),
afterBeforeLoadConfigMap: z.string().optional(),
intraBrokerDataToMoveMB: z.number().optional(),
monitoredPartitionsPercentage: z.number().optional(),
provisionRecommendation: z.string().optional(),
excludedBrokersForReplicaMove: z.array(z.string()).nullable().optional(),
excludedBrokersForLeadership: z.array(z.string()).nullable().optional(),
provisionStatus: z.string().optional(),
onDemandBalancednessScoreBefore: z.number().optional(),
recentWindows: z.number().optional(),
dataToMoveMB: z.number().optional(),
excludedTopics: z.array(z.string()).nullable().optional(),
numLeaderMovements: z.number().optional(),
});

export const RebalanceSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z
.object({
autoApproval: z.boolean().optional(),
allowedActions: z.array(z.string()),
})
.optional(),
attributes: z.object({
name: z.string(),
namespace: z.string(),
creationTimestamp: z.string(),
status: RebalanceStatusSchema,
mode: ModeSchema,
brokers: z.array(z.number()).nullable(),
sessionId: z.string().nullable(),
optimizationResult: OptimizationResultSchema,
}),
});

const RebalancesListSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z.object({
page: z.object({
cursor: z.string(),
}),
autoApproval: z.boolean(),
managed: z.boolean().optional(),
allowedActions: z.array(z.string()),
}),
attributes: RebalanceSchema.shape.attributes.pick({
name: true,
status: true,
creationTimestamp: true,
mode: true,
brokers: true,
optimizationResult: true
}),
});

export const RebalanceResponseSchema = z.object({
meta: z.object({
page: z.object({
total: z.number(),
pageNumber: z.number().optional(),
}),
}),
links: z.object({
first: z.string().nullable(),
prev: z.string().nullable(),
next: z.string().nullable(),
last: z.string().nullable(),
}),
data: z.array(RebalancesListSchema),
});
export type RebalanceList = z.infer<typeof RebalancesListSchema>;
export type RebalancesResponse = z.infer<typeof RebalanceResponseSchema>;

export type RebalanceResponse = z.infer<typeof RebalanceSchema>;

export type RebalanceStatus = z.infer<typeof RebalanceStatusSchema>;

export type RebalanceMode = z.infer<typeof ModeSchema>;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BreadcrumbItem } from "@/libs/patternfly/react-core";

export default function TopicsActiveBreadcrumb() {
export default function NodesActiveBreadcrumb() {
return <BreadcrumbItem showDivider={true}>Brokers</BreadcrumbItem>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { BreadcrumbItem } from "@/libs/patternfly/react-core";

export default function RebalanceActiveBreadcrumb() {
return <BreadcrumbItem showDivider={true}>Brokers</BreadcrumbItem>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,49 @@ import { getKafkaCluster } from "@/api/kafka/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { AppHeader } from "@/components/AppHeader";
import { Number } from "@/components/Format/Number";
import { Label, Spinner, Split, SplitItem } from "@/libs/patternfly/react-core";
import { NavItemLink } from "@/components/Navigation/NavItemLink";
import {
Label,
Nav,
NavList,
PageNavigation,
Spinner,
Split,
SplitItem,
} from "@/libs/patternfly/react-core";
import { CheckCircleIcon } from "@/libs/patternfly/react-icons";
import { Suspense } from "react";

export default function NodesHeader({ params }: { params: KafkaParams }) {
return (
<Suspense fallback={<Header />}>
<Suspense
fallback={<Header kafkaId={undefined} cruiseControlEnable={false} />}
>
<ConnectedHeader params={params} />
</Suspense>
);
}

async function ConnectedHeader({ params }: { params: KafkaParams }) {
const cluster = await getKafkaCluster(params.kafkaId);
return <Header total={cluster?.attributes.nodes.length || 0} />;
return (
<Header
total={cluster?.attributes.nodes.length || 0}
kafkaId={cluster?.id}
cruiseControlEnable={cluster?.attributes.cruiseControlEnabled || false}
/>
);
}

function Header({ total }: { total?: number }) {
function Header({
total,
kafkaId,
cruiseControlEnable,
}: {
total?: number;
kafkaId: string | undefined;
cruiseControlEnable: boolean;
}) {
return (
<AppHeader
title={
Expand All @@ -41,6 +66,22 @@ function Header({ total }: { total?: number }) {
</SplitItem>
</Split>
}
navigation={
<PageNavigation>
<Nav aria-label="Node navigation" variant="tertiary">
<NavList>
<NavItemLink url={`/kafka/${kafkaId}/nodes`}>
Overview
</NavItemLink>
{cruiseControlEnable && (
<NavItemLink url={`/kafka/${kafkaId}/nodes/rebalances`}>
Rebalance
</NavItemLink>
)}
</NavList>
</Nav>
</PageNavigation>
}
/>
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { getKafkaCluster } from "@/api/kafka/actions";
import { KafkaParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/kafka.params";
import { AppHeader } from "@/components/AppHeader";
import { Number } from "@/components/Format/Number";
import { NavItemLink } from "@/components/Navigation/NavItemLink";
import {
Label,
Nav,
NavList,
PageNavigation,
Spinner,
Split,
SplitItem,
} from "@/libs/patternfly/react-core";
import { CheckCircleIcon } from "@/libs/patternfly/react-icons";
import { Suspense } from "react";

export default function NodesHeader({ params }: { params: KafkaParams }) {
return (
<Suspense
fallback={<Header kafkaId={undefined} cruiseControlEnable={false} />}
>
<ConnectedHeader params={params} />
</Suspense>
);
}

async function ConnectedHeader({ params }: { params: KafkaParams }) {
const cluster = await getKafkaCluster(params.kafkaId);
return (
<Header
total={cluster?.attributes.nodes.length || 0}
kafkaId={cluster?.id}
cruiseControlEnable={cluster?.attributes.cruiseControlEnabled || false}
/>
);
}

function Header({
total,
kafkaId,
cruiseControlEnable,
}: {
total?: number;
kafkaId: string | undefined;
cruiseControlEnable: boolean;
}) {
return (
<AppHeader
title={
<Split hasGutter={true}>
<SplitItem>Brokers</SplitItem>
<SplitItem>
<Label
color={"green"}
icon={
total === undefined ? (
<Spinner size={"sm"} />
) : (
<CheckCircleIcon />
)
}
>
{total && <Number value={total} />}
</Label>
</SplitItem>
</Split>
}
navigation={
<PageNavigation>
<Nav aria-label="Node navigation" variant="tertiary">
<NavList>
<NavItemLink url={`/kafka/${kafkaId}/nodes`}>
Overview
</NavItemLink>
{cruiseControlEnable && (
<NavItemLink url={`/kafka/${kafkaId}/nodes/rebalances`}>
Rebalance
</NavItemLink>
)}
</NavList>
</Nav>
</PageNavigation>
}
/>
);
}
Loading