Skip to content

Commit

Permalink
Add KRaft to example install, support node pool storage metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Jun 10, 2024
1 parent 72ce164 commit b2cc560
Show file tree
Hide file tree
Showing 14 changed files with 156 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public Response listClusters(
KafkaCluster.Fields.KAFKA_VERSION,
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
},
message = "list contains a value that is not valid or not available for the operation",
payload = ErrorCategory.InvalidQueryParameter.class)
Expand All @@ -90,6 +91,7 @@ public Response listClusters(
KafkaCluster.Fields.KAFKA_VERSION,
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
}))
List<String> fields,

Expand Down Expand Up @@ -134,6 +136,7 @@ public CompletionStage<Response> describeCluster(
KafkaCluster.Fields.KAFKA_VERSION,
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -155,6 +158,7 @@ public CompletionStage<Response> describeCluster(
KafkaCluster.Fields.KAFKA_VERSION,
KafkaCluster.Fields.STATUS,
KafkaCluster.Fields.CONDITIONS,
KafkaCluster.Fields.NODE_POOLS,
}))
List<String> fields) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static class Fields {
public static final String KAFKA_VERSION = "kafkaVersion";
public static final String STATUS = "status";
public static final String CONDITIONS = "conditions";
public static final String NODE_POOLS = "nodePools";

static final Comparator<KafkaCluster> ID_COMPARATOR =
comparing(KafkaCluster::getId, nullsLast(String::compareTo));
Expand All @@ -59,7 +60,8 @@ NAMESPACE, comparing(KafkaCluster::getNamespace),
+ LISTENERS + ", "
+ KAFKA_VERSION + ", "
+ STATUS + ", "
+ CONDITIONS + ", ";
+ CONDITIONS + ", "
+ NODE_POOLS;

public static final String DESCRIBE_DEFAULT =
NAME + ", "
Expand All @@ -71,7 +73,8 @@ NAMESPACE, comparing(KafkaCluster::getNamespace),
+ LISTENERS + ", "
+ KAFKA_VERSION + ", "
+ STATUS + ", "
+ CONDITIONS + ", ";
+ CONDITIONS + ", "
+ NODE_POOLS;

private Fields() {
// Prevent instances
Expand Down Expand Up @@ -133,6 +136,7 @@ public KafkaClusterResource(KafkaCluster data) {
List<Condition> conditions;
@JsonIgnore
boolean configured;
List<String> nodePools;

public KafkaCluster(String id, List<Node> nodes, Node controller, List<String> authorizedOperations) {
super();
Expand Down Expand Up @@ -258,4 +262,12 @@ public boolean isConfigured() {
public void setConfigured(boolean configured) {
this.configured = configured;
}

public List<String> getNodePools() {
return nodePools;
}

public void setNodePools(List<String> nodePools) {
this.nodePools = nodePools;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ void setKafkaClusterStatus(KafkaCluster cluster, Kafka kafka) {
c -> cluster.setStatus("NotReady"),
() -> cluster.setStatus("Ready"));
});
Optional.ofNullable(status.getKafkaNodePools())
.ifPresent(pools -> cluster.setNodePools(pools.stream().map(pool -> pool.getName()).toList()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ void setup() throws IOException {
.withType("Ready")
.withStatus("True")
.endCondition()
.addNewKafkaNodePool()
.withName("my-node-pool")
.endKafkaNodePool()
.endStatus()
.build())
.create();
Expand Down Expand Up @@ -187,6 +190,7 @@ void testListClusters() {
.body("data.id", containsInAnyOrder(clusterId1, clusterId2))
.body("data.attributes.name", containsInAnyOrder("test-kafka1", "test-kafka2"))
.body("data.find { it.attributes.name == 'test-kafka1'}.attributes.status", is("Ready"))
.body("data.find { it.attributes.name == 'test-kafka1'}.attributes.nodePools", contains("my-node-pool"))
.body("data.find { it.attributes.name == 'test-kafka1'}.attributes.listeners", hasItem(allOf(
hasEntry("bootstrapServers", k1Bootstrap),
hasEntry("authType", "custom"))))
Expand Down
25 changes: 23 additions & 2 deletions install/002-deploy-console-kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ RESOURCE_PATH=${CONSOLE_INSTALL_PATH}/resources/kafka

export NAMESPACE="${1?Please provide the deployment namespace}"
export CLUSTER_DOMAIN="${2?Please provide the base domain name for Kafka listener ingress}"
export MODE="${3:-kraft}"

if [ "${MODE}" != "kraft" ] && [ "${MODE}" != "zk" ] ; then
echo "Unknown Kafka mode: '${MODE}'. Allowed values are [ 'kraft', 'zk' ]"
exit 1
fi

source ${CONSOLE_INSTALL_PATH}/_common.sh

Expand All @@ -20,7 +26,22 @@ fi

export LISTENER_TYPE

# Replace env variables
${YQ} '(.. | select(tag == "!!str")) |= envsubst(ne)' ${RESOURCE_PATH}/console-kafka.kafka.yaml | ${KUBE} apply -n ${NAMESPACE} -f -
if [ "${MODE}" == "kraft" ] ; then
if ! ${KUBE} get KafkaNodePool console-nodepool >/dev/null ; then
${KUBE} delete Kafka console-kafka -n ${NAMESPACE} || true
fi

# Replace env variables
${YQ} '(.. | select(tag == "!!str")) |= envsubst(ne)' ${RESOURCE_PATH}/console-kafka.kafka.yaml | ${KUBE} apply -n ${NAMESPACE} -f -
${KUBE} apply -n ${NAMESPACE} -f ${RESOURCE_PATH}/console-nodepool.kafkanodepool.yaml
else
if ${KUBE} get KafkaNodePool console-nodepool >/dev/null ; then
${KUBE} delete Kafka console-kafka -n ${NAMESPACE} || true
${KUBE} delete KafkaNodePool console-nodepool -n ${NAMESPACE} || true
fi

# Replace env variables
${YQ} '(.. | select(tag == "!!str")) |= envsubst(ne)' ${RESOURCE_PATH}/console-kafka-zk.kafka.yaml | ${KUBE} apply -n ${NAMESPACE} -f -
fi

${KUBE} apply -n ${NAMESPACE} -f ${RESOURCE_PATH}/console-kafka-user1.kafkauser.yaml
8 changes: 8 additions & 0 deletions install/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ a ConfigMap to export metrics in the way expected by the Prometheus instance cre
002-deploy-console-kafka.sh ${TARGET_NAMESPACE} ${CLUSTER_DOMAIN}
```

By default, the Kafka cluster will be created in KRaft mode (Zookeeper-less). If you would like to create a cluster
that uses ZooKeeper, an additional third argument with the value `zk` may be given. The value may also be `kraft` to
explicitly request a KRaft cluster.

```shell
002-deploy-console-kafka.sh ${TARGET_NAMESPACE} ${CLUSTER_DOMAIN} zk
```

### Authorization

In order to allow the necessary access for the console to function, a minimum level of authorization must be configured
Expand Down
65 changes: 65 additions & 0 deletions install/resources/kafka/console-kafka-zk.kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: console-kafka
spec:
entityOperator:
topicOperator: {}
userOperator: {}
kafka:
authorization:
type: simple
config:
allow.everyone.if.no.acl.found: 'true'
default.replication.factor: 3
inter.broker.protocol.version: '3.7'
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
listeners:
- name: secure
port: 9093
tls: true
type: ${LISTENER_TYPE}
authentication:
type: scram-sha-512
configuration:
bootstrap:
host: bootstrap.console-kafka.${CLUSTER_DOMAIN}
annotations:
streamshub.github.com/console-listener: 'true'
brokers:
- broker: 0
host: broker-0.console-kafka.${CLUSTER_DOMAIN}
- broker: 1
host: broker-1.console-kafka.${CLUSTER_DOMAIN}
- broker: 2
host: broker-2.console-kafka.${CLUSTER_DOMAIN}
replicas: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: console-kafka-metrics
key: kafka-metrics-config.yml
version: 3.7.0
zookeeper:
replicas: 3
storage:
deleteClaim: false
size: 10Gi
type: persistent-claim
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: console-kafka-metrics
key: zookeeper-metrics-config.yml
26 changes: 4 additions & 22 deletions install/resources/kafka/console-kafka.kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: console-kafka
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
entityOperator:
topicOperator: {}
Expand All @@ -12,7 +15,6 @@ spec:
config:
allow.everyone.if.no.acl.found: 'true'
default.replication.factor: 3
inter.broker.protocol.version: '3.6'
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
Expand All @@ -36,30 +38,10 @@ spec:
host: broker-1.console-kafka.${CLUSTER_DOMAIN}
- broker: 2
host: broker-2.console-kafka.${CLUSTER_DOMAIN}
replicas: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: console-kafka-metrics
key: kafka-metrics-config.yml
version: 3.6.0
zookeeper:
replicas: 3
storage:
deleteClaim: false
size: 10Gi
type: persistent-claim
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: console-kafka-metrics
key: zookeeper-metrics-config.yml
version: 3.7.0
18 changes: 18 additions & 0 deletions install/resources/kafka/console-nodepool.kafkanodepool.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: console-nodepool
labels:
strimzi.io/cluster: console-kafka
spec:
replicas: 3
roles:
- controller
- broker
storage:
type: jbod
volumes:
- deleteClaim: false
id: 0
size: 10Gi
type: persistent-claim
7 changes: 5 additions & 2 deletions ui/api/kafka/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export async function getKafkaCluster(
): Promise<ClusterDetail | null> {
const sp = new URLSearchParams({
"fields[kafkas]":
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions",
"name,namespace,creationTimestamp,status,kafkaVersion,nodes,controller,authorizedOperations,listeners,conditions,nodePools",
});
const kafkaClusterQuery = sp.toString();
const url = `${process.env.BACKEND_URL}/api/kafkas/${clusterId}?${kafkaClusterQuery}`;
Expand Down Expand Up @@ -92,6 +92,7 @@ export async function getKafkaClusterKpis(
cluster.attributes.namespace,
cluster.attributes.name,
cluster.attributes.controller.id,
cluster.attributes.nodePools?.join("|") ?? "",
),
);

Expand Down Expand Up @@ -212,13 +213,14 @@ export async function getKafkaClusterMetrics(
async function getRangeByNodeId(
namespace: string,
name: string,
nodePools: string,
metric: ClusterMetric,
) {
const start = new Date().getTime() - 1 * 60 * 60 * 1000;
const end = new Date();
const step = 60 * 1;
const seriesRes = await prom!.rangeQuery(
cluster[metric](namespace, name),
cluster[metric](namespace, name, nodePools),
start,
end,
step,
Expand Down Expand Up @@ -246,6 +248,7 @@ export async function getKafkaClusterMetrics(
getRangeByNodeId(
cluster.attributes.namespace,
cluster.attributes.name,
cluster.attributes.nodePools?.join("|") ?? "",
m,
),
),
Expand Down
8 changes: 4 additions & 4 deletions ui/api/kafka/cluster.promql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ export const memory = (namespace: string, cluster: string) => `
)
`;

export const volumeCapacity = (namespace: string, cluster: string) => `
export const volumeCapacity = (namespace: string, cluster: string, nodePools: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-kafka-\\\\d+"},
kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
Expand All @@ -52,11 +52,11 @@ export const volumeCapacity = (namespace: string, cluster: string) => `
)
`;

export const volumeUsed = (namespace: string, cluster: string) => `
export const volumeUsed = (namespace: string, cluster: string, nodePools: string) => `
sum by (nodeId, __console_metric_name__) (
label_replace(
label_replace(
kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-kafka-\\\\d+"},
kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
Expand Down
5 changes: 3 additions & 2 deletions ui/api/kafka/kpi.promql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export const values = (
namespace: string,
cluster: string,
controller: number,
nodePools: string,
) => `
sum by (__console_metric_name__, nodeId) (
label_replace(
Expand Down Expand Up @@ -101,7 +102,7 @@ or
sum by (__console_metric_name__, nodeId) (
label_replace(
label_replace(
kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-kafka-\\\\d+"},
kubelet_volume_stats_capacity_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
Expand All @@ -119,7 +120,7 @@ or
sum by (__console_metric_name__, nodeId) (
label_replace(
label_replace(
kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-kafka-\\\\d+"},
kubelet_volume_stats_used_bytes{namespace="${namespace}",persistentvolumeclaim=~"data(?:-\\\\d+)?-${cluster}-(kafka|${nodePools})-\\\\d+"},
"nodeId",
"$1",
"persistentvolumeclaim",
Expand Down
1 change: 1 addition & 0 deletions ui/api/kafka/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const ClusterDetailSchema = z.object({
lastTransitionTime: z.string().optional(),
}),
),
nodePools: z.array(z.string()).optional().nullable(),
}),
});
export const ClusterResponse = z.object({
Expand Down
Loading

0 comments on commit b2cc560

Please sign in to comment.