Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster overview page #206

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ui/.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
NEXT_PUBLIC_KEYCLOAK_URL="https://console-keycloak.mycluster-us-east-107719-da779aef12eee96bd4161f4e402b70ec-0000.us-east.containers.appdomain.cloud/realms/demo"
#NEXT_PUBLIC_KEYCLOAK_URL="https://console-keycloak.mycluster-us-east-107719-da779aef12eee96bd4161f4e402b70ec-0000.us-east.containers.appdomain.cloud/realms/demo"
BACKEND_URL=https://console-api-eyefloaters-dev.mycluster-us-east-107719-da779aef12eee96bd4161f4e402b70ec-0000.us-east.containers.appdomain.cloud
LOG_LEVEL="info"
CONSOLE_MODE="read-write"
CONSOLE_MODE="read-write"
5 changes: 5 additions & 0 deletions ui/.storybook/preview-head.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<style>
html, body, #storybook-root {
height: 100%;
}
</style>
19 changes: 0 additions & 19 deletions ui/.storybook/preview.ts

This file was deleted.

32 changes: 32 additions & 0 deletions ui/.storybook/preview.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import NextIntlProvider from "@/app/[locale]/NextIntlProvider";
import { Page } from "@patternfly/react-core";
import type { Preview } from "@storybook/react";
import "../app/globals.css";
import messages from "../messages/en.json";

const preview: Preview = {
parameters: {
nextjs: {
appDirectory: true,
},
actions: { argTypesRegex: "^on[A-Z].*" },
controls: {
matchers: {
color: /(background|color)$/i,
date: /Date$/i,
},
},
layout: "fullscreen",
},
decorators: [
(Story) => (
<NextIntlProvider locale={"en"} messages={messages}>
<Page>
<Story />
</Page>
</NextIntlProvider>
),
],
};

export default preview;
194 changes: 192 additions & 2 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,25 @@
import { getHeaders } from "@/api/api";
import {
ClusterDetail,
ClusterKpis,
ClusterKpisSchema,
ClusterList,
ClusterMetricRange,
ClusterMetricRangeSchema,
ClusterResponse,
ClustersResponseSchema,
} from "@/api/kafka/schema";
import { logger } from "@/utils/logger";
import groupBy from "lodash.groupby";
import { PrometheusDriver } from "prometheus-query";
import * as ranges from "./ranges.promql";
import { values } from "./values.promql";

export type Range = keyof typeof ranges;

const prom = new PrometheusDriver({
endpoint: process.env.CONSOLE_METRICS_PROMETHEUS_URL,
});

const log = logger.child({ module: "kafka-api" });

Expand All @@ -27,15 +41,191 @@ export async function getKafkaClusters(): Promise<ClusterList[]> {
export async function getKafkaCluster(
clusterId: string,
): Promise<ClusterDetail | null> {
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}/?fields%5Bkafkas%5D=name,namespace,creationTimestamp,nodes,controller,authorizedOperations,bootstrapServers,authType,metrics`;
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}/?fields%5Bkafkas%5D=name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,bootstrapServers,authType`;
try {
const res = await fetch(url, {
headers: await getHeaders(),
cache: "force-cache",
});
const rawData = await res.json();
log.debug(rawData, "getKafkaCluster response");
return ClusterResponse.parse(rawData).data;
} catch (err) {
log.error(err, "getKafkaCluster");
log.error({ err, clusterId }, "getKafkaCluster");
return null;
}
}

export async function getKafkaClusterKpis(
clusterId: string,
): Promise<{ cluster: ClusterDetail; kpis: ClusterKpis } | null> {
try {
const cluster = await getKafkaCluster(clusterId);
if (!cluster) {
return null;
}

const valuesRes = await prom.instantQuery(
values(
cluster.attributes.namespace,
cluster.attributes.name,
cluster.attributes.controller.id,
),
);

/*
Prometheus returns the data unaggregated. Eg.

[
{
"metric": {
"labels": {
"__console_metric_name__": "broker_state",
"nodeId": "2"
}
},
"value": {
"time": "2023-12-12T16:00:53.381Z",
"value": 3
}
},
...
]

We start by flattening the labels, and then group by metric name
*/
const groupedMetrics = groupBy(
valuesRes.result.map((serie) => ({
metric: serie.metric.labels.__console_metric_name__,
nodeId: serie.metric.labels.nodeId,
time: serie.value.time,
value: serie.value.value,
})),
(v) => v.metric,
);

/*
Now we want to transform the data in something easier to work with in the UI.

Some are totals, in an array form with a single entry; we just need the number. These will look like a metric:value
mapping.

Some KPIs are provided split by broker id. Of these, some are counts (identified by the string `_count` in the
metric name), and some are other infos. Both will be grouped by nodeId.
The `_count` metrics will have a value with two properties, `byNode` and `total`. `byNode` will hold the grouping. `total` will
have the sum of all the counts.
Other metrics will look like a metric:[node:value] mapping.

Expected result:
{
"broker_state": {
"0": 3,
"1": 3,
"2": 3
},
"total_topics": 5,
"total_partitions": 55,
"underreplicated_topics": 0,
"replica_count": {
"byNode": {
"0": 57,
"1": 54,
"2": 54
},
"total": 165
},
"leader_count": {
"byNode": {
"0": 19,
"1": 18,
"2": 18
},
"total": 55
}
}
*/
const kpis = Object.fromEntries(
Object.entries(groupedMetrics).map(([metric, value]) => {
const total = value.reduce((acc, v) => acc + v.value, 0);
if (value.find((v) => v.nodeId)) {
const byNode = Object.fromEntries(
value.map(({ nodeId, value }) =>
nodeId ? [nodeId, value] : ["value", value],
),
);
return metric.includes("_count")
? [
metric,
{
byNode,
total,
},
]
: [metric, byNode];
} else {
return [metric, total];
}
}),
);
log.debug({ kpis, clusterId }, "getKafkaClusterKpis");
return {
cluster,
kpis: ClusterKpisSchema.parse(kpis),
};
} catch (err) {
log.error({ err, clusterId }, "getKafkaClusterKpis");
return null;
}
}

export async function getKafkaClusterMetrics(
clusterId: string,
metrics: Array<Range>,
): Promise<{
cluster: ClusterDetail;
ranges: Record<Range, ClusterMetricRange>;
} | null> {
async function getRange(namespace: string, name: string, metric: Range) {
const start = new Date().getTime() - 1 * 60 * 60 * 1000;
const end = new Date();
const step = 60 * 2;
const seriesRes = await prom.rangeQuery(
ranges[metric](namespace, name),
start,
end,
step,
);
const range = Object.fromEntries(
seriesRes.result.flatMap((serie) =>
serie.values.map((v: any) => [new Date(v.time).getTime(), v.value]),
),
);
return [metric, ClusterMetricRangeSchema.parse(range)];
}

try {
const cluster = await getKafkaCluster(clusterId);
if (!cluster) {
return null;
}

const rangesRes = Object.fromEntries(
await Promise.all(
metrics.map((m) =>
getRange(cluster.attributes.namespace, cluster.attributes.name, m),
),
),
);
log.debug(
{ ranges: rangesRes, clusterId, metric: metrics },
"getKafkaClusterMetric",
);
return {
cluster,
ranges: rangesRes,
};
} catch (err) {
log.error({ err, clusterId, metric: metrics }, "getKafkaClusterMetric");
return null;
}
}
95 changes: 95 additions & 0 deletions ui/api/kafka/ranges.promql.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
export const cpu = (namespace: string, cluster: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
rate(container_cpu_usage_seconds_total{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+"}[1m]),
"nodeId",
"$1",
"pod",
".+-(\\\\d+)"
),
"__console_metric_name__",
"cpu_usage_seconds",
"",
""
)
)
`;

export const memory = (namespace: string, cluster: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
container_memory_usage_bytes{namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+"},
"nodeId",
"$1",
"pod",
".+-(\\\\d+)"
),
"__console_metric_name__",
"memory_usage_bytes",
"",
""
)
)
`;

export const incomingByteRate = (namespace: string, cluster: string) => `
sum by (__console_metric_name__) (
label_replace(
irate(kafka_server_brokertopicmetrics_bytesin_total{topic!="",namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"}[5m]),
"__console_metric_name__",
"incoming_byte_rate",
"",
""
)
)
`;

export const outgoingByteRate = (namespace: string, cluster: string) => `
sum by (__console_metric_name__) (
label_replace(
irate(kafka_server_brokertopicmetrics_bytesout_total{topic!="",namespace="${namespace}",pod=~"${cluster}-.+-\\\\d+",strimzi_io_kind="Kafka"}[5m]),
"__console_metric_name__",
"outgoing_byte_rate",
"",
""
)
)
`;

export const volumeCapacity = (namespace: string, cluster: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-.+-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
".+-(\\\\d+)"
),
"__console_metric_name__",
"volume_stats_capacity_bytes",
"",
""
)
)
`;

export const volumeUsed = (namespace: string, cluster: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-.+-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
".+-(\\\\d+)"
),
"__console_metric_name__",
"volume_stats_used_bytes",
"",
""
)
)
`;
Loading
Loading