Skip to content

Commit

Permalink
Kafka rebalance list
Browse files Browse the repository at this point in the history
Signed-off-by: hemahg <[email protected]>
  • Loading branch information
hemahg committed Sep 30, 2024
1 parent cf469d7 commit 900fc52
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export async function getKafkaCluster(
): Promise<ClusterDetail | null> {
const sp = new URLSearchParams({
"fields[kafkas]":
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools",
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools,cruiseControlEnabled",
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
Expand Down
1 change: 1 addition & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const ClusterDetailSchema = z.object({
nodes: z.array(NodeSchema),
controller: NodeSchema,
authorizedOperations: z.array(z.string()),
cruiseControlEnabled: z.boolean().optional(),
listeners: z
.array(
z.object({
Expand Down
81 changes: 81 additions & 0 deletions ui/api/rebalance/actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"use server";
import { logger } from "@/utils/logger";
import {
RebalanceResponse,
RebalanceResponseSchema,
RebalanceSchema,
RebalancesResponse,
RebalanceStatus,
} from "./schema";
import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
import { getHeaders } from "@/api/api";

const log = logger.child({ module: "rebalance-api" });

export async function getRebalancesList(
kafkaId: string,
params: {
name?: string;
mode?: string;
status?: RebalanceStatus[];
pageSize?: number;
pageCursor?: string;
sort?: string;
sortDir?: string;
},
): Promise<RebalancesResponse> {
const sp = new URLSearchParams(
filterUndefinedFromObj({
"fields[kafkaRebalances]":
"name,namespace,creationTimestamp,status,mode,brokers",
"filter[name]": params.name ? `like,*${params.name}*` : undefined,
"filter[status]":
params.status && params.status.length > 0
? `in,${params.status.join(",")}`
: undefined,
"filter[mode]": params.mode ? `like,*${params.mode}*` : undefined,
"page[size]": params.pageSize,
"page[after]": params.pageCursor,
sort: params.sort
? (params.sortDir !== "asc" ? "-" : "") + params.sort
: undefined,
}),
);
const rebalanceQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances?${rebalanceQuery}`;
const res = await fetch(url, {
headers: await getHeaders(),
next: {
tags: ["rebalances"],
},
});

log.debug({ url }, "getRebalanceList");
const rawData = await res.json();
log.trace({ url, rawData }, "getRebalanceList response");
return RebalanceResponseSchema.parse(rawData);
}

export async function getRebalanceDetails(
kafkaId: string,
rebalanceId: string,
): Promise<RebalanceResponse> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${kafkaId}/rebalances/${rebalanceId}`;
const decodedRebalanceId = decodeURIComponent(rebalanceId);
const body = {
data: {
type: "kafkaRebalances",
id: decodedRebalanceId,
meta: {},
attributes: {},
},
};
log.debug({ url }, "Fetching rebalance details");
const res = await fetch(url, {
headers: await getHeaders(),
method: "PATCH",
body: JSON.stringify(body),
});
const rawData = await res.json();
return RebalanceSchema.parse(rawData.data);
}
92 changes: 92 additions & 0 deletions ui/api/rebalance/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { z } from "zod";

const RebalanceStatusSchema = z.union([
z.literal("New"),
z.literal("PendingProposal"),
z.literal("ProposalReady"),
z.literal("Rebalancing"),
z.literal("Stopped"),
z.literal("NotReady"),
z.literal("Ready"),
z.literal("ReconciliationPaused"),
]);

const OptimizationResultSchema = z.object({
numIntraBrokerReplicaMovements: z.number(),
numReplicaMovements: z.number(),
onDemandBalancednessScoreAfter: z.number(),
afterBeforeLoadConfigMap: z.string(),
intraBrokerDataToMoveMB: z.number(),
monitoredPartitionsPercentage: z.number(),
provisionRecommendation: z.string(),
excludedBrokersForReplicaMove: z.array(z.string()).nullable(),
excludedBrokersForLeadership: z.array(z.string()).nullable(),
provisionStatus: z.string(),
onDemandBalancednessScoreBefore: z.number(),
recentWindows: z.number(),
dataToMoveMB: z.number(),
excludedTopics: z.array(z.string()).nullable(),
numLeaderMovements: z.number(),
});

export const RebalanceSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z
.object({
autoApproval: z.boolean().optional(),
allowedActions: z.array(z.string()),
})
.optional(),
attributes: z.object({
name: z.string(),
namespace: z.string(),
creationTimestamp: z.string(),
status: RebalanceStatusSchema,
mode: z.string().optional(),
brokers: z.array(z.number()).nullable(),
sessionId: z.string().optional(),
optimizationResult: OptimizationResultSchema,
}),
});

const RebalancesListSchema = z.object({
id: z.string(),
type: z.literal("kafkaRebalances"),
meta: z.object({
page: z.object({
cursor: z.string(),
}),
autoApproval: z.boolean(),
managed: z.boolean().optional(),
}),
attributes: RebalanceSchema.shape.attributes.pick({
name: true,
status: true,
creationTimestamp: true,
mode: true,
brokers: true,
}),
});

export const RebalanceResponseSchema = z.object({
meta: z.object({
page: z.object({
total: z.number(),
pageNumber: z.number().optional(),
}),
}),
links: z.object({
first: z.string().nullable(),
prev: z.string().nullable(),
next: z.string().nullable(),
last: z.string().nullable(),
}),
data: z.array(RebalancesListSchema),
});
export type RebalanceList = z.infer<typeof RebalancesListSchema>;
export type RebalancesResponse = z.infer<typeof RebalanceResponseSchema>;

export type RebalanceResponse = z.infer<typeof RebalanceSchema>;

export type RebalanceStatus = z.infer<typeof RebalanceStatusSchema>;

0 comments on commit 900fc52

Please sign in to comment.