diff --git a/api/examples/kafka-ephemeral-ingress.yaml b/api/examples/kafka-ephemeral-ingress.yaml index 94f5694f3..4d2babcc4 100644 --- a/api/examples/kafka-ephemeral-ingress.yaml +++ b/api/examples/kafka-ephemeral-ingress.yaml @@ -6,6 +6,13 @@ spec: kafka: version: 3.5.1 replicas: 3 + resources: + limits: + cpu: 500m + memory: 1Gi + requests: + cpu: 75m + memory: 1Gi authorization: type: simple listeners: @@ -53,7 +60,12 @@ spec: inter.broker.protocol.version: "3.5" allow.everyone.if.no.acl.found: "true" storage: - type: ephemeral + volumes: + - type: "persistent-claim" + size: "1Gi" + deleteClaim: true + id: 0 + type: "jbod" metricsConfig: type: jmxPrometheusExporter valueFrom: @@ -62,8 +74,17 @@ spec: key: kafka-metrics-config.yml zookeeper: replicas: 3 + resources: + limits: + cpu: 200m + memory: 512Mi + requests: + cpu: 75m + memory: 512Mi storage: - type: ephemeral + type: persistent-claim + size: 1Gi + deleteClaim: false metricsConfig: type: jmxPrometheusExporter valueFrom: diff --git a/api/examples/metrics/prometheus-additional.yaml b/api/examples/metrics/prometheus-additional.yaml new file mode 100644 index 000000000..594123f26 --- /dev/null +++ b/api/examples/metrics/prometheus-additional.yaml @@ -0,0 +1,94 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: additional-scrape-configs +type: Opaque +stringData: + prometheus-additional.yaml: | + - job_name: kubernetes-cadvisor + honor_labels: true + scrape_interval: 10s + scrape_timeout: 10s + metrics_path: /metrics/cadvisor + scheme: https + kubernetes_sd_configs: + - role: node + namespaces: + names: [] + bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token + tls_config: + ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + insecure_skip_verify: true + relabel_configs: + - separator: ; + regex: __meta_kubernetes_node_label_(.+) + replacement: $1 + action: labelmap + - separator: ; + regex: (.*) + target_label: __address__ + replacement: kubernetes.default.svc:443 + action: replace + - source_labels: [__meta_kubernetes_node_name] + separator: ; + regex: (.+) + target_label: __metrics_path__ + replacement: /api/v1/nodes/${1}/proxy/metrics/cadvisor + action: replace + - source_labels: [__meta_kubernetes_node_name] + separator: ; + regex: (.*) + target_label: node_name + replacement: $1 + action: replace + - source_labels: [__meta_kubernetes_node_address_InternalIP] + separator: ; + regex: (.*) + target_label: node_ip + replacement: $1 + action: replace + metric_relabel_configs: + - source_labels: [container, __name__] + separator: ; + regex: POD;container_(network).* + target_label: container + replacement: $1 + action: replace + # - source_labels: [container] + # separator: ; + # regex: POD + # replacement: $1 + # action: drop + # - source_labels: [container] + # separator: ; + # regex: ^$ + # replacement: $1 + # action: drop + - source_labels: [__name__] + separator: ; + regex: container_(network_tcp_usage_total|tasks_state|memory_failures_total|network_udp_usage_total) + replacement: $1 + action: drop + + - job_name: kubernetes-nodes-kubelet + scrape_interval: 10s + scrape_timeout: 10s + scheme: https + kubernetes_sd_configs: + - role: node + namespaces: + names: [] + bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token + tls_config: + ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + insecure_skip_verify: true + relabel_configs: + - action: labelmap + regex: __meta_kubernetes_node_label_(.+) + - target_label: __address__ + replacement: kubernetes.default.svc:443 + - source_labels: [__meta_kubernetes_node_name] + regex: (.+) + target_label: __metrics_path__ + replacement: /api/v1/nodes/${1}/proxy/metrics diff --git a/api/examples/metrics/prometheus-pod-monitors.yaml b/api/examples/metrics/prometheus-pod-monitors.yaml new file mode 100644 index 000000000..cc8b17151 --- /dev/null +++ b/api/examples/metrics/prometheus-pod-monitors.yaml @@ -0,0 +1,82 @@ +--- +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: cluster-operator-metrics + labels: + app: strimzi +spec: + selector: + matchLabels: + strimzi.io/kind: cluster-operator + namespaceSelector: + matchNames: + - myproject + podMetricsEndpoints: + - path: /metrics + port: http +--- +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: entity-operator-metrics + labels: + app: strimzi +spec: + selector: + matchLabels: + app.kubernetes.io/name: entity-operator + namespaceSelector: + matchNames: + - myproject + podMetricsEndpoints: + - path: /metrics + port: healthcheck +--- +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: kafka-resources-metrics + labels: + app: strimzi +spec: + selector: + matchExpressions: + - key: "strimzi.io/kind" + operator: In + values: ["Kafka"] + namespaceSelector: + matchNames: + - myproject + podMetricsEndpoints: + - path: /metrics + port: tcp-prometheus + relabelings: + - separator: ; + regex: __meta_kubernetes_pod_label_(strimzi_io_.+) + replacement: $1 + action: labelmap + - sourceLabels: [__meta_kubernetes_namespace] + separator: ; + regex: (.*) + targetLabel: namespace + replacement: $1 + action: replace + - sourceLabels: [__meta_kubernetes_pod_name] + separator: ; + regex: (.*) + targetLabel: kubernetes_pod_name + replacement: $1 + action: replace + - sourceLabels: [__meta_kubernetes_pod_node_name] + separator: ; + regex: (.*) + targetLabel: node_name + replacement: $1 + action: replace + - sourceLabels: [__meta_kubernetes_pod_host_ip] + separator: ; + regex: (.*) + targetLabel: node_ip + replacement: $1 + action: replace diff --git a/api/examples/metrics/prometheus-rules.yaml b/api/examples/metrics/prometheus-rules.yaml new file mode 100644 index 000000000..f50a04eeb --- /dev/null +++ b/api/examples/metrics/prometheus-rules.yaml @@ -0,0 +1,186 @@ +--- +apiVersion: monitoring.coreos.com/v1 +kind: PrometheusRule +metadata: + labels: + role: alert-rules + app: strimzi + name: prometheus-k8s-rules +spec: + groups: + - name: kafka + rules: + - alert: KafkaRunningOutOfSpace + expr: kubelet_volume_stats_available_bytes{persistentvolumeclaim=~"data(-[0-9]+)?-(.+)-kafka-[0-9]+"} * 100 / kubelet_volume_stats_capacity_bytes{persistentvolumeclaim=~"data(-[0-9]+)?-(.+)-kafka-[0-9]+"} < 15 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka is running out of free disk space' + description: 'There are only {{ $value }} percent available at {{ $labels.persistentvolumeclaim }} PVC' + - alert: UnderReplicatedPartitions + expr: kafka_server_replicamanager_underreplicatedpartitions > 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka under replicated partitions' + description: 'There are {{ $value }} under replicated partitions on {{ $labels.kubernetes_pod_name }}' + - alert: AbnormalControllerState + expr: sum(kafka_controller_kafkacontroller_activecontrollercount) by (strimzi_io_name) != 1 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka abnormal controller state' + description: 'There are {{ $value }} active controllers in the cluster' + - alert: OfflinePartitions + expr: sum(kafka_controller_kafkacontroller_offlinepartitionscount) > 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka offline partitions' + description: 'One or more partitions have no leader' + - alert: UnderMinIsrPartitionCount + expr: kafka_server_replicamanager_underminisrpartitioncount > 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka under min ISR partitions' + description: 'There are {{ $value }} partitions under the min ISR on {{ $labels.kubernetes_pod_name }}' + - alert: OfflineLogDirectoryCount + expr: kafka_log_logmanager_offlinelogdirectorycount > 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'Kafka offline log directories' + description: 'There are {{ $value }} offline log directories on {{ $labels.kubernetes_pod_name }}' + - alert: ScrapeProblem + expr: up{kubernetes_namespace!~"openshift-.+",kubernetes_pod_name=~".+-kafka-[0-9]+"} == 0 + for: 3m + labels: + severity: major + annotations: + summary: 'Prometheus unable to scrape metrics from {{ $labels.kubernetes_pod_name }}/{{ $labels.instance }}' + description: 'Prometheus was unable to scrape metrics from {{ $labels.kubernetes_pod_name }}/{{ $labels.instance }} for more than 3 minutes' + - alert: ClusterOperatorContainerDown + expr: count((container_last_seen{container="strimzi-cluster-operator"} > (time() - 90))) < 1 or absent(container_last_seen{container="strimzi-cluster-operator"}) + for: 1m + labels: + severity: major + annotations: + summary: 'Cluster Operator down' + description: 'The Cluster Operator has been down for longer than 90 seconds' + - alert: KafkaBrokerContainersDown + expr: absent(container_last_seen{container="kafka",pod=~".+-kafka-[0-9]+"}) + for: 3m + labels: + severity: major + annotations: + summary: 'All `kafka` containers down or in CrashLookBackOff status' + description: 'All `kafka` containers have been down or in CrashLookBackOff status for 3 minutes' + - alert: KafkaContainerRestartedInTheLast5Minutes + expr: count(count_over_time(container_last_seen{container="kafka"}[5m])) > 2 * count(container_last_seen{container="kafka",pod=~".+-kafka-[0-9]+"}) + for: 5m + labels: + severity: warning + annotations: + summary: 'One or more Kafka containers restarted too often' + description: 'One or more Kafka containers were restarted too often within the last 5 minutes' + - name: zookeeper + rules: + - alert: AvgRequestLatency + expr: zookeeper_avgrequestlatency > 10 + for: 10s + labels: + severity: warning + annotations: + summary: 'Zookeeper average request latency' + description: 'The average request latency is {{ $value }} on {{ $labels.kubernetes_pod_name }}' + - alert: OutstandingRequests + expr: zookeeper_outstandingrequests > 10 + for: 10s + labels: + severity: warning + annotations: + summary: 'Zookeeper outstanding requests' + description: 'There are {{ $value }} outstanding requests on {{ $labels.kubernetes_pod_name }}' + - alert: ZookeeperRunningOutOfSpace + expr: kubelet_volume_stats_available_bytes{persistentvolumeclaim=~"data-(.+)-zookeeper-[0-9]+"} < 5368709120 + for: 10s + labels: + severity: warning + annotations: + summary: 'Zookeeper is running out of free disk space' + description: 'There are only {{ $value }} bytes available at {{ $labels.persistentvolumeclaim }} PVC' + - alert: ZookeeperContainerRestartedInTheLast5Minutes + expr: count(count_over_time(container_last_seen{container="zookeeper"}[5m])) > 2 * count(container_last_seen{container="zookeeper",pod=~".+-zookeeper-[0-9]+"}) + for: 5m + labels: + severity: warning + annotations: + summary: 'One or more Zookeeper containers were restarted too often' + description: 'One or more Zookeeper containers were restarted too often within the last 5 minutes. This alert can be ignored when the Zookeeper cluster is scaling up' + - alert: ZookeeperContainersDown + expr: absent(container_last_seen{container="zookeeper",pod=~".+-zookeeper-[0-9]+"}) + for: 3m + labels: + severity: major + annotations: + summary: 'All `zookeeper` containers in the Zookeeper pods down or in CrashLookBackOff status' + description: 'All `zookeeper` containers in the Zookeeper pods have been down or in CrashLookBackOff status for 3 minutes' + - name: entityOperator + rules: + - alert: TopicOperatorContainerDown + expr: absent(container_last_seen{container="topic-operator",pod=~".+-entity-operator-.+"}) + for: 3m + labels: + severity: major + annotations: + summary: 'Container topic-operator in Entity Operator pod down or in CrashLookBackOff status' + description: 'Container topic-operator in Entity Operator pod has been or in CrashLookBackOff status for 3 minutes' + - alert: UserOperatorContainerDown + expr: absent(container_last_seen{container="user-operator",pod=~".+-entity-operator-.+"}) + for: 3m + labels: + severity: major + annotations: + summary: 'Container user-operator in Entity Operator pod down or in CrashLookBackOff status' + description: 'Container user-operator in Entity Operator pod have been down or in CrashLookBackOff status for 3 minutes' + - alert: EntityOperatorTlsSidecarContainerDown + expr: absent(container_last_seen{container="tls-sidecar",pod=~".+-entity-operator-.+"}) + for: 3m + labels: + severity: major + annotations: + summary: 'Container tls-sidecar Entity Operator pod down or in CrashLookBackOff status' + description: 'Container tls-sidecar in Entity Operator pod have been down or in CrashLookBackOff status for 3 minutes' + - name: kafkaExporter + rules: + - alert: UnderReplicatedPartition + expr: kafka_topic_partition_under_replicated_partition > 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'Topic has under-replicated partitions' + description: 'Topic {{ $labels.topic }} has {{ $value }} under-replicated partition {{ $labels.partition }}' + - alert: TooLargeConsumerGroupLag + expr: kafka_consumergroup_lag > 1000 + for: 10s + labels: + severity: warning + annotations: + summary: 'Consumer group lag is too big' + description: 'Consumer group {{ $labels.consumergroup}} lag is too big ({{ $value }}) on topic {{ $labels.topic }}/partition {{ $labels.partition }}' + - alert: NoMessageForTooLong + expr: changes(kafka_topic_partition_current_offset{topic!="__consumer_offsets"}[10m]) == 0 + for: 10s + labels: + severity: warning + annotations: + summary: 'No message for 10 minutes' + description: 'There is no messages in topic {{ $labels.topic}}/partition {{ $labels.partition }} for 10 minutes' diff --git a/api/examples/metrics/prometheus.yaml b/api/examples/metrics/prometheus.yaml new file mode 100644 index 000000000..bf66c7248 --- /dev/null +++ b/api/examples/metrics/prometheus.yaml @@ -0,0 +1,75 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: prometheus-server + labels: + app: strimzi +rules: + - apiGroups: [""] + resources: + - nodes + - nodes/proxy + - services + - endpoints + - pods + verbs: ["get", "list", "watch"] + - apiGroups: + - extensions + resources: + - ingresses + verbs: ["get", "list", "watch"] + - nonResourceURLs: ["/metrics"] + verbs: ["get"] +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: prometheus-server + labels: + app: strimzi +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: prometheus-server + labels: + app: strimzi +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: prometheus-server +subjects: + - kind: ServiceAccount + name: prometheus-server + namespace: myproject +--- +apiVersion: monitoring.coreos.com/v1 +kind: Prometheus +metadata: + name: console-prometheus + labels: + app: strimzi +spec: + replicas: 1 + serviceAccountName: prometheus-server + podMonitorSelector: + matchLabels: + app: strimzi + serviceMonitorSelector: {} + resources: + requests: + memory: 400Mi + enableAdminAPI: false + ruleSelector: + matchLabels: + role: alert-rules + app: strimzi + alerting: + alertmanagers: + - namespace: myproject + name: alertmanager + port: alertmanager + additionalScrapeConfigs: + name: additional-scrape-configs + key: prometheus-additional.yaml diff --git a/api/pom.xml b/api/pom.xml index 4bae5de1e..5eea4147d 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -125,6 +125,10 @@ io.quarkus quarkus-resteasy-reactive-jackson + + io.quarkus + quarkus-rest-client-reactive + io.quarkus quarkus-jsonp diff --git a/api/src/main/java/com/github/eyefloaters/console/api/Annotations.java b/api/src/main/java/com/github/eyefloaters/console/api/Annotations.java index c428aaabb..7a73f4ccb 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/Annotations.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/Annotations.java @@ -2,11 +2,26 @@ public enum Annotations { + /** + * Annotation that may be placed on a Strimzi Kafka resource to be hidden + * (ignored) by the console API server. Resources with this annotation will + * not appear in the Kafka cluster listing and attempts to fetch information + * using the describeCluster operation will result in a 404 client error. + */ + CONSOLE_HIDDEN("console-hidden"), + /** * Annotation to identify a listener in Strimzi Kafka resources to be used for * connections directly from the Console API. */ - CONSOLE_LISTENER("console-listener"); + CONSOLE_LISTENER("console-listener"), + + /** + * Annotation to identify a listener in Strimzi Kafka resources to be used for + * public connections. This may be used to differentiate a listener to be + * exposed via the KafkaCluster resource and published in the UI. + */ + EXPOSED_LISTENER("exposed-listener"); private static final String NAMESPACE = "eyefloaters.github.com"; private final String value; diff --git a/api/src/main/java/com/github/eyefloaters/console/api/KafkaClustersResource.java b/api/src/main/java/com/github/eyefloaters/console/api/KafkaClustersResource.java index 897034a77..22d6956c7 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/KafkaClustersResource.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/KafkaClustersResource.java @@ -69,7 +69,10 @@ public Response listClusters( KafkaCluster.Fields.NAMESPACE, KafkaCluster.Fields.CREATION_TIMESTAMP, KafkaCluster.Fields.BOOTSTRAP_SERVERS, - KafkaCluster.Fields.AUTH_TYPE + KafkaCluster.Fields.AUTH_TYPE, + KafkaCluster.Fields.KAFKA_VERSION, + KafkaCluster.Fields.STATUS, + KafkaCluster.Fields.CONDITIONS, }, message = "list contains a value that is not valid or not available for the operation", payload = ErrorCategory.InvalidQueryParameter.class) @@ -85,7 +88,10 @@ public Response listClusters( KafkaCluster.Fields.NAMESPACE, KafkaCluster.Fields.CREATION_TIMESTAMP, KafkaCluster.Fields.BOOTSTRAP_SERVERS, - KafkaCluster.Fields.AUTH_TYPE + KafkaCluster.Fields.AUTH_TYPE, + KafkaCluster.Fields.KAFKA_VERSION, + KafkaCluster.Fields.STATUS, + KafkaCluster.Fields.CONDITIONS, })) List fields, @@ -126,7 +132,11 @@ public CompletionStage describeCluster( KafkaCluster.Fields.CONTROLLER, KafkaCluster.Fields.AUTHORIZED_OPERATIONS, KafkaCluster.Fields.BOOTSTRAP_SERVERS, - KafkaCluster.Fields.AUTH_TYPE + KafkaCluster.Fields.AUTH_TYPE, + KafkaCluster.Fields.METRICS, + KafkaCluster.Fields.KAFKA_VERSION, + KafkaCluster.Fields.STATUS, + KafkaCluster.Fields.CONDITIONS, }, payload = ErrorCategory.InvalidQueryParameter.class) @Parameter( @@ -144,7 +154,11 @@ public CompletionStage describeCluster( KafkaCluster.Fields.CONTROLLER, KafkaCluster.Fields.AUTHORIZED_OPERATIONS, KafkaCluster.Fields.BOOTSTRAP_SERVERS, - KafkaCluster.Fields.AUTH_TYPE + KafkaCluster.Fields.AUTH_TYPE, + KafkaCluster.Fields.METRICS, + KafkaCluster.Fields.KAFKA_VERSION, + KafkaCluster.Fields.STATUS, + KafkaCluster.Fields.CONDITIONS, })) List fields) { diff --git a/api/src/main/java/com/github/eyefloaters/console/api/model/Condition.java b/api/src/main/java/com/github/eyefloaters/console/api/model/Condition.java new file mode 100644 index 000000000..d8501470a --- /dev/null +++ b/api/src/main/java/com/github/eyefloaters/console/api/model/Condition.java @@ -0,0 +1,22 @@ +package com.github.eyefloaters.console.api.model; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +@JsonInclude(value = Include.NON_NULL) +public record Condition( + String status, + String reason, + String message, + String type, + String lastTransitionTime) { + + public Condition(io.strimzi.api.kafka.model.status.Condition condition) { + this(condition.getStatus(), + condition.getReason(), + condition.getMessage(), + condition.getType(), + condition.getLastTransitionTime()); + } + +} diff --git a/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaCluster.java b/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaCluster.java index 6350c4b98..310686236 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaCluster.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/model/KafkaCluster.java @@ -35,6 +35,10 @@ public static class Fields { public static final String AUTHORIZED_OPERATIONS = "authorizedOperations"; public static final String BOOTSTRAP_SERVERS = "bootstrapServers"; public static final String AUTH_TYPE = "authType"; + public static final String METRICS = "metrics"; + public static final String KAFKA_VERSION = "kafkaVersion"; + public static final String STATUS = "status"; + public static final String CONDITIONS = "conditions"; static final Comparator ID_COMPARATOR = comparing(KafkaCluster::getId); @@ -56,7 +60,10 @@ BOOTSTRAP_SERVERS, comparing(KafkaCluster::getBootstrapServers, nullsLast(String + NAMESPACE + ", " + CREATION_TIMESTAMP + ", " + BOOTSTRAP_SERVERS + ", " - + AUTH_TYPE; + + AUTH_TYPE + ", " + + KAFKA_VERSION + ", " + + STATUS + ", " + + CONDITIONS + ", "; public static final String DESCRIBE_DEFAULT = NAME + ", " @@ -66,7 +73,10 @@ BOOTSTRAP_SERVERS, comparing(KafkaCluster::getBootstrapServers, nullsLast(String + CONTROLLER + ", " + AUTHORIZED_OPERATIONS + ", " + BOOTSTRAP_SERVERS + ", " - + AUTH_TYPE; + + AUTH_TYPE + ", " + + KAFKA_VERSION + ", " + + STATUS + ", " + + CONDITIONS + ", "; private Fields() { // Prevent instances @@ -120,6 +130,13 @@ public KafkaClusterResource(KafkaCluster data) { final List authorizedOperations; String bootstrapServers; // Strimzi Kafka CR only String authType; // Strimzi Kafka CR only + @Schema(readOnly = true, description = """ + Contains the set of metrics optionally retrieved only in a describe operation. + """) + Metrics metrics = new Metrics(); + String kafkaVersion; + String status; + List conditions; public KafkaCluster(String id, List nodes, Node controller, List authorizedOperations) { super(); @@ -225,4 +242,32 @@ public String getAuthType() { public void setAuthType(String authType) { this.authType = authType; } + + public Metrics getMetrics() { + return metrics; + } + + public String getKafkaVersion() { + return kafkaVersion; + } + + public void setKafkaVersion(String kafkaVersion) { + this.kafkaVersion = kafkaVersion; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public List getConditions() { + return conditions; + } + + public void setConditions(List conditions) { + this.conditions = conditions; + } } diff --git a/api/src/main/java/com/github/eyefloaters/console/api/model/Metrics.java b/api/src/main/java/com/github/eyefloaters/console/api/model/Metrics.java new file mode 100644 index 000000000..30bfceb06 --- /dev/null +++ b/api/src/main/java/com/github/eyefloaters/console/api/model/Metrics.java @@ -0,0 +1,51 @@ +package com.github.eyefloaters.console.api.model; + +import java.time.Instant; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.eclipse.microprofile.openapi.annotations.media.Schema; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +public record Metrics( + @JsonProperty + Map> values, + + @JsonProperty + Map> ranges) { + + public Metrics() { + this(new LinkedHashMap<>(), new LinkedHashMap<>()); + } + + @Schema(additionalProperties = String.class) + public static record ValueMetric( + @JsonProperty + String value, + + @JsonAnyGetter + @Schema(hidden = true) + Map attributes) { + } + + @Schema(additionalProperties = String.class) + public static record RangeMetric( + @JsonProperty + @Schema(implementation = String[][].class) + List range, + + @JsonAnyGetter + @Schema(hidden = true) + Map attributes) { + } + + @JsonFormat(shape = JsonFormat.Shape.ARRAY) + @JsonPropertyOrder({"when", "value"}) + public static record RangeEntry(Instant when, String value) { + } +} diff --git a/api/src/main/java/com/github/eyefloaters/console/api/service/KafkaClusterService.java b/api/src/main/java/com/github/eyefloaters/console/api/service/KafkaClusterService.java index ebfa55996..391c74797 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/service/KafkaClusterService.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/service/KafkaClusterService.java @@ -1,12 +1,18 @@ package com.github.eyefloaters.console.api.service; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@ -17,12 +23,15 @@ import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; import org.apache.kafka.common.KafkaFuture; +import org.jboss.logging.Logger; import com.github.eyefloaters.console.api.Annotations; +import com.github.eyefloaters.console.api.model.Condition; import com.github.eyefloaters.console.api.model.KafkaCluster; import com.github.eyefloaters.console.api.model.Node; import com.github.eyefloaters.console.api.support.ListRequestContext; +import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.strimzi.api.kafka.model.Kafka; import io.strimzi.api.kafka.model.listener.KafkaListenerAuthentication; @@ -39,18 +48,27 @@ @ApplicationScoped public class KafkaClusterService { + @Inject + Logger logger; + @Inject SharedIndexInformer kafkaInformer; @Inject Supplier clientSupplier; + @Inject + MetricsService metricsService; + public List listClusters(ListRequestContext listSupport) { return kafkaInformer.getStore() .list() .stream() - .map(k -> consoleListener(k).map(l -> toKafkaCluster(k, l)).orElse(null)) - .filter(Objects::nonNull) + .filter(Predicate.not(k -> annotatedKafka(k, Annotations.CONSOLE_HIDDEN))) + .map(k -> exposedListener(k) + .map(l -> listenerStatus(k, l)) + .map(l -> toKafkaCluster(k, l)) + .orElseGet(() -> toKafkaCluster(k))) .map(listSupport::tally) .filter(listSupport::betweenCursors) .sorted(listSupport.getSortComparator()) @@ -70,44 +88,105 @@ public CompletionStage describeCluster(List fields) { result.clusterId(), result.controller(), result.nodes()) + .toCompletionStage() .thenApply(nothing -> new KafkaCluster( get(result::clusterId), get(result::nodes).stream().map(Node::fromKafkaModel).toList(), Node.fromKafkaModel(get(result::controller)), enumNames(get(result::authorizedOperations)))) .thenApply(this::addKafkaResourceData) - .toCompletionStage(); + .thenCompose(cluster -> addMetrics(cluster, fields)); } KafkaCluster toKafkaCluster(Kafka kafka, ListenerStatus listener) { + KafkaCluster cluster = new KafkaCluster(kafka.getStatus().getClusterId(), null, null, null); + setKafkaClusterProperties(cluster, kafka, listener); + return cluster; + } + + KafkaCluster toKafkaCluster(Kafka kafka) { KafkaCluster cluster = new KafkaCluster(kafka.getStatus().getClusterId(), null, null, null); cluster.setName(kafka.getMetadata().getName()); cluster.setNamespace(kafka.getMetadata().getNamespace()); cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); - cluster.setBootstrapServers(listener.getBootstrapServers()); - cluster.setAuthType(getAuthType(kafka, listener).orElse(null)); + setKafkaClusterStatus(cluster, kafka); return cluster; } KafkaCluster addKafkaResourceData(KafkaCluster cluster) { findCluster(kafkaInformer, cluster.getId()) - .ifPresent(kafka -> consoleListener(kafka) - .ifPresent(l -> { - cluster.setName(kafka.getMetadata().getName()); - cluster.setNamespace(kafka.getMetadata().getNamespace()); - cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); - cluster.setBootstrapServers(l.getBootstrapServers()); - cluster.setAuthType(getAuthType(kafka, l).orElse(null)); - })); + .ifPresent(kafka -> exposedListener(kafka) + .map(l -> listenerStatus(kafka, l)) + .ifPresent(l -> setKafkaClusterProperties(cluster, kafka, l))); return cluster; } + void setKafkaClusterProperties(KafkaCluster cluster, Kafka kafka, ListenerStatus listener) { + cluster.setName(kafka.getMetadata().getName()); + cluster.setNamespace(kafka.getMetadata().getNamespace()); + cluster.setCreationTimestamp(kafka.getMetadata().getCreationTimestamp()); + cluster.setBootstrapServers(listener.getBootstrapServers()); + cluster.setAuthType(getAuthType(kafka, listener).orElse(null)); + setKafkaClusterStatus(cluster, kafka); + } + + void setKafkaClusterStatus(KafkaCluster cluster, Kafka kafka) { + Optional.ofNullable(kafka.getStatus()) + .ifPresent(status -> { + cluster.setKafkaVersion(status.getKafkaVersion()); + Optional.ofNullable(status.getConditions()) + .ifPresent(conditions -> { + cluster.setConditions(conditions.stream().map(Condition::new).toList()); + + conditions.stream() + .filter(c -> "NotReady".equals(c.getType()) && "True".equals(c.getStatus())) + .findFirst() + .ifPresentOrElse( + c -> cluster.setStatus("NotReady"), + () -> cluster.setStatus("Ready")); + }); + }); + } + + CompletionStage addMetrics(KafkaCluster cluster, List fields) { + if (!fields.contains(KafkaCluster.Fields.METRICS)) { + return CompletableFuture.completedStage(cluster); + } + + if (metricsService.disabled()) { + logger.warnf("Kafka cluster metrics were requested, but Prometheus URL is not configured"); + return CompletableFuture.completedStage(cluster); + } + + String namespace = cluster.getNamespace(); + String name = cluster.getName(); + + try (var rangesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_ranges.promql"); + var valuesStream = getClass().getResourceAsStream("/metrics/queries/kafkaCluster_values.promql")) { + String rangeQuery = new String(rangesStream.readAllBytes(), StandardCharsets.UTF_8) + .formatted(namespace, name); + String valueQuery = new String(valuesStream.readAllBytes(), StandardCharsets.UTF_8) + .formatted(namespace, name); + + var rangeResults = metricsService.queryRanges(rangeQuery).toCompletableFuture(); + var valueResults = metricsService.queryValues(valueQuery).toCompletableFuture(); + + return CompletableFuture.allOf( + rangeResults.thenAccept(cluster.getMetrics().ranges()::putAll), + valueResults.thenAccept(cluster.getMetrics().values()::putAll)) + .thenApply(nothing -> cluster); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static Optional findCluster(SharedIndexInformer kafkaInformer, String clusterId) { return kafkaInformer.getStore() .list() .stream() .filter(k -> Objects.equals(clusterId, k.getStatus().getClusterId())) + .filter(Predicate.not(k -> annotatedKafka(k, Annotations.CONSOLE_HIDDEN))) .findFirst(); } @@ -115,11 +194,31 @@ public static Optional consoleListener(Kafka kafka) { return kafka.getSpec().getKafka().getListeners().stream() .filter(listener -> !KafkaListenerType.INTERNAL.equals(listener.getType())) .filter(KafkaClusterService::supportedAuthentication) - .sorted((l1, l2) -> Integer.compare(listenerSortKey(l1), listenerSortKey(l2))) + .sorted((l1, l2) -> Integer.compare( + listenerSortKey(l1, Annotations.CONSOLE_LISTENER), + listenerSortKey(l2, Annotations.CONSOLE_LISTENER))) .findFirst() .map(listener -> listenerStatus(kafka, listener)); } + /** + * Find the listener to be exposed via the API for the given Kafka instance. + * Listeners annotated as the (1) exposed-listener or the (2) console-listener + * will be preferred, in that order. + */ + public static Optional exposedListener(Kafka kafka) { + var comparator = Comparator + .comparingInt((GenericKafkaListener listener) -> + listenerSortKey(listener, Annotations.EXPOSED_LISTENER)) + .thenComparingInt((GenericKafkaListener listener) -> + listenerSortKey(listener, Annotations.CONSOLE_LISTENER)); + + return kafka.getSpec().getKafka().getListeners().stream() + .filter(listener -> !KafkaListenerType.INTERNAL.equals(listener.getType())) + .sorted(comparator) + .findFirst(); + } + static boolean supportedAuthentication(GenericKafkaListener listener) { KafkaListenerAuthentication listenerAuth = listener.getAuth(); @@ -140,15 +239,23 @@ static boolean supportedAuthentication(GenericKafkaListener listener) { } } - static int listenerSortKey(GenericKafkaListener listener) { - return annotatedListener(listener) ? -1 : 1; + static int listenerSortKey(GenericKafkaListener listener, Annotations listenerAnnotation) { + return annotatedListener(listener, listenerAnnotation) ? -1 : 1; + } + + static boolean annotatedKafka(Kafka kafka, Annotations listenerAnnotation) { + return Optional.ofNullable(kafka.getMetadata()) + .map(ObjectMeta::getAnnotations) + .map(annotations -> annotations.get(listenerAnnotation.value())) + .map(Boolean::valueOf) + .orElse(false); } - static boolean annotatedListener(GenericKafkaListener listener) { + static boolean annotatedListener(GenericKafkaListener listener, Annotations listenerAnnotation) { return Optional.ofNullable(listener.getConfiguration()) .map(GenericKafkaListenerConfiguration::getBootstrap) .map(config -> config.getAnnotations()) - .map(annotations -> annotations.get(Annotations.CONSOLE_LISTENER.value())) + .map(annotations -> annotations.get(listenerAnnotation.value())) .map(Boolean::valueOf) .orElse(false); } diff --git a/api/src/main/java/com/github/eyefloaters/console/api/service/MetricsService.java b/api/src/main/java/com/github/eyefloaters/console/api/service/MetricsService.java new file mode 100644 index 000000000..811a9a227 --- /dev/null +++ b/api/src/main/java/com/github/eyefloaters/console/api/service/MetricsService.java @@ -0,0 +1,125 @@ +package com.github.eyefloaters.console.api.service; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.json.JsonArray; +import jakarta.json.JsonObject; +import jakarta.ws.rs.WebApplicationException; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.logging.Logger; + +import com.github.eyefloaters.console.api.model.Metrics; +import com.github.eyefloaters.console.api.model.Metrics.RangeEntry; +import com.github.eyefloaters.console.api.support.PrometheusAPI; + +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; + +@ApplicationScoped +public class MetricsService { + + public static final String METRIC_NAME = "__console_metric_name__"; + + @Inject + Logger logger; + + @Inject + @ConfigProperty(name = "console.metrics.prometheus-url") + Supplier> prometheusUrl; + + @Inject + @RestClient + PrometheusAPI prometheusAPI; + + public boolean disabled() { + return prometheusUrl.get().isEmpty(); + } + + CompletionStage>> queryValues(String query) { + String time = Double.toString(System.currentTimeMillis() / 1000d); + + return fetchMetrics( + () -> prometheusAPI.query(query, time), + (metric, attributes) -> { + // ignore timestamp in first position + String value = metric.getJsonArray("value").getString(1); + return new Metrics.ValueMetric(value, attributes); + }); + } + + CompletionStage>> queryRanges(String query) { + Instant now = Instant.now(); + String start = Double.toString(now.minus(30, ChronoUnit.MINUTES).toEpochMilli() / 1000d); + String end = Double.toString(now.toEpochMilli() / 1000d); + + return fetchMetrics( + () -> prometheusAPI.queryRange(query, start, end, "60"), + (metric, attributes) -> { + List values = metric.getJsonArray("values") + .stream() + .map(JsonArray.class::cast) + .map(e -> new Metrics.RangeEntry( + Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)), + e.getString(1) + )) + .toList(); + + return new Metrics.RangeMetric(values, attributes); + }); + } + + CompletionStage>> fetchMetrics( + Supplier operation, + BiFunction, M> builder) { + + return CompletableFuture.supplyAsync(() -> { + try { + return extractMetrics(operation.get(), builder); + } catch (WebApplicationException wae) { + logger.warnf("Failed to retrieve Kafka cluster metrics: %s", + wae.getResponse().getEntity()); + return Collections.emptyMap(); + } catch (Exception e) { + logger.warnf(e, "Failed to retrieve Kafka cluster metrics"); + return Collections.emptyMap(); + } + }); + } + + Map> extractMetrics(JsonObject response, + BiFunction, M> builder) { + + return response.getJsonObject("data").getJsonArray("result") + .stream() + .map(JsonObject.class::cast) + .map(metric -> { + JsonObject meta = metric.getJsonObject("metric"); + String metricName = meta.getString(METRIC_NAME); + + Map attributes = meta.keySet() + .stream() + .filter(Predicate.not(METRIC_NAME::equals)) + .map(key -> Map.entry(key, meta.getString(key))) + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return Map.entry(metricName, builder.apply(metric, attributes)); + }) + .collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList()))); + } +} diff --git a/api/src/main/java/com/github/eyefloaters/console/api/service/TopicService.java b/api/src/main/java/com/github/eyefloaters/console/api/service/TopicService.java index 10697bc42..a9b5656f5 100644 --- a/api/src/main/java/com/github/eyefloaters/console/api/service/TopicService.java +++ b/api/src/main/java/com/github/eyefloaters/console/api/service/TopicService.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; @@ -30,8 +31,8 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.DescribeLogDirsOptions; import org.apache.kafka.clients.admin.DescribeTopicsOptions; -import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicListing; @@ -68,6 +69,15 @@ public class TopicService { List.of(OffsetSpec.earliest(), OffsetSpec.latest(), OffsetSpec.maxTimestamp()); private static final Predicate CONFIG_SORT = Pattern.compile("^-?configs\\..+$").asMatchPredicate(); + private static final Set REQUIRE_DESCRIBE = Set.of( + Topic.Fields.PARTITIONS, + Topic.Fields.AUTHORIZED_OPERATIONS, + Topic.Fields.RECORD_COUNT, + Topic.Fields.TOTAL_LEADER_LOG_BYTES); + private static final Set REQUIRE_PARTITIONS = Set.of( + Topic.Fields.PARTITIONS, + Topic.Fields.RECORD_COUNT, + Topic.Fields.TOTAL_LEADER_LOG_BYTES); @Inject Logger logger; @@ -383,14 +393,11 @@ CompletableFuture maybeDescribeConfigs(Admin adminClient, Map } CompletableFuture maybeDescribeTopics(Admin adminClient, Map topics, List fields, String offsetSpec) { - if (fields.contains(Topic.Fields.PARTITIONS) - || fields.contains(Topic.Fields.AUTHORIZED_OPERATIONS) - || fields.contains(Topic.Fields.RECORD_COUNT)) { + if (REQUIRE_DESCRIBE.stream().anyMatch(fields::contains)) { return describeTopics(adminClient, topics.keySet(), fields, offsetSpec) .thenApply(descriptions -> { descriptions.forEach((id, either) -> { - if (fields.contains(Topic.Fields.PARTITIONS) - || fields.contains(Topic.Fields.RECORD_COUNT)) { + if (REQUIRE_PARTITIONS.stream().anyMatch(fields::contains)) { topics.get(id).addPartitions(either); } if (fields.contains(Topic.Fields.AUTHORIZED_OPERATIONS)) { diff --git a/api/src/main/java/com/github/eyefloaters/console/api/support/PrometheusAPI.java b/api/src/main/java/com/github/eyefloaters/console/api/support/PrometheusAPI.java new file mode 100644 index 000000000..167a018a6 --- /dev/null +++ b/api/src/main/java/com/github/eyefloaters/console/api/support/PrometheusAPI.java @@ -0,0 +1,45 @@ +package com.github.eyefloaters.console.api.support; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.json.JsonObject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; + +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; + +@ApplicationScoped +@RegisterRestClient(configKey = "prometheus") +@Path("/api/v1") +public interface PrometheusAPI { + + /** + * Evaluates an instant query at a single point in time + * + * @see Instant queries + */ + @Path("query") + @POST + @Consumes(MediaType.APPLICATION_FORM_URLENCODED) + @Produces(MediaType.APPLICATION_JSON) + JsonObject query(@QueryParam("query") String query, + @QueryParam("time") String time); + + /** + * Evaluates an expression query over a range of time + * + * @see Range queries + */ + @Path("query_range") + @POST + @Consumes(MediaType.APPLICATION_FORM_URLENCODED) + @Produces(MediaType.APPLICATION_JSON) + JsonObject queryRange(@QueryParam("query") String query, + @QueryParam("start") String start, + @QueryParam("end") String end, + @QueryParam("step") String step); + +} diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index be1c02d50..c00536b37 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -40,6 +40,11 @@ quarkus.swagger-ui.enable=true quarkus.swagger-ui.always-include=true quarkus.swagger-ui.title=Console API +# Prometheus client properties +prometheus/mp-rest/url=${console.metrics.prometheus-url} +prometheus/mp-rest/connectTimeout=5000 +prometheus/mp-rest/readTimeout=10000 + quarkus.log.category."org.apache.kafka".level=WARN quarkus.jacoco.reuse-data-file=true @@ -63,6 +68,8 @@ quarkus.arc.unremovable-types=com.github.eyefloaters.console.api.** %dev.quarkus.log.category."com.github.eyefloaters.console".level=DEBUG ######## +# Filler URL used to build the Quarkus REST client in mocks +%testplain.console.metrics.prometheus-url=http://prometheus.example.com %testplain.quarkus.http.auth.proactive=false %testplain.quarkus.http.auth.permission."oidc".policy=permit %testplain.quarkus.log.category."io.vertx.core.impl.BlockedThreadChecker".level=OFF diff --git a/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql b/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql new file mode 100644 index 000000000..84534c728 --- /dev/null +++ b/api/src/main/resources/metrics/queries/kafkaCluster_ranges.promql @@ -0,0 +1,83 @@ + sum by (nodeId, __console_metric_name__) ( + label_replace( + label_replace( + rate(container_cpu_usage_seconds_total{namespace="%1$s",pod=~"%2$s-kafka-\\d+"}[1m]), + "nodeId", + "$1", + "pod", + ".+-kafka-(\\d+)" + ), + "__console_metric_name__", + "cpu_usage_seconds", + "", + "" + ) + ) +or + sum by (nodeId, __console_metric_name__) ( + label_replace( + label_replace( + container_memory_usage_bytes{namespace="%1$s",pod=~"%2$s-kafka-\\d+"}, + "nodeId", + "$1", + "pod", + ".+-kafka-(\\d+)" + ), + "__console_metric_name__", + "memory_usage_bytes", + "", + "" + ) + ) +or + sum by (__console_metric_name__) ( + label_replace( + irate(kafka_server_brokertopicmetrics_bytesin_total{topic!="",namespace="%1$s",pod=~"%2$s-kafka-\\d+"}[5m]), + "__console_metric_name__", + "incoming_byte_rate", + "", + "" + ) + ) +or + sum by (__console_metric_name__) ( + label_replace( + irate(kafka_server_brokertopicmetrics_bytesout_total{topic!="",namespace="%1$s",pod=~"%2$s-kafka-\\d+"}[5m]), + "__console_metric_name__", + "outgoing_byte_rate", + "", + "" + ) + ) +or + sum by (nodeId, __console_metric_name__) ( + label_replace( + label_replace( + kubelet_volume_stats_capacity_bytes{namespace="%1$s",persistentvolumeclaim=~"data(?:-\\d+)?-%2$s-kafka-\\d+"}, + "nodeId", + "$1", + "persistentvolumeclaim", + ".+-kafka-(\\d+)" + ), + "__console_metric_name__", + "volume_stats_capacity_bytes", + "", + "" + ) + ) +or + sum by (nodeId, __console_metric_name__) ( + label_replace( + label_replace( + kubelet_volume_stats_used_bytes{namespace="%1$s",persistentvolumeclaim=~"data(?:-\\d+)?-%2$s-kafka-\\d+"}, + "nodeId", + "$1", + "persistentvolumeclaim", + ".+-kafka-(\\d+)" + ), + "__console_metric_name__", + "volume_stats_used_bytes", + "", + "" + ) + ) diff --git a/api/src/main/resources/metrics/queries/kafkaCluster_values.promql b/api/src/main/resources/metrics/queries/kafkaCluster_values.promql new file mode 100644 index 000000000..047a6b052 --- /dev/null +++ b/api/src/main/resources/metrics/queries/kafkaCluster_values.promql @@ -0,0 +1,49 @@ +sum by (__console_metric_name__, nodeId) ( + label_replace( + label_replace( + kafka_server_kafkaserver_brokerstate{namespace="%1$s",pod=~"%2$s-kafka-\\d+"} > 0, + "nodeId", + "$1", + "pod", + ".+-kafka-(\\d+)" + ), + "__console_metric_name__", + "broker_state", + "", + "" + ) +) + +or + +sum by (__console_metric_name__) ( + label_replace( + kafka_controller_kafkacontroller_globaltopiccount{namespace="%1$s",pod=~"%2$s-kafka-\\d+"} > 0, + "__console_metric_name__", + "total_topics", + "", + "" + ) +) + +or + +sum by (__console_metric_name__) ( + label_replace( + kafka_controller_kafkacontroller_globalpartitioncount{namespace="%1$s",pod=~"%2$s-kafka-\\d+"} > 0, + "__console_metric_name__", + "total_partitions", + "", + "" + ) +) + +or + +label_replace( + count(sum by (topic) (kafka_cluster_partition_underreplicated{namespace="%1$s",pod=~"%2$s-kafka-\\d+"})), + "__console_metric_name__", + "underreplicated_topics", + "", + "" +) \ No newline at end of file diff --git a/api/src/test/java/com/github/eyefloaters/console/api/KafkaClustersResourceIT.java b/api/src/test/java/com/github/eyefloaters/console/api/KafkaClustersResourceIT.java index c7e01d49a..ca3986fe6 100644 --- a/api/src/test/java/com/github/eyefloaters/console/api/KafkaClustersResourceIT.java +++ b/api/src/test/java/com/github/eyefloaters/console/api/KafkaClustersResourceIT.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.URI; +import java.time.Instant; import java.util.Base64; import java.util.HashMap; import java.util.List; @@ -21,6 +22,8 @@ import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.json.JsonString; +import jakarta.ws.rs.InternalServerErrorException; +import jakarta.ws.rs.core.Response; import jakarta.ws.rs.core.Response.Status; import org.apache.kafka.clients.CommonClientConfigs; @@ -28,6 +31,8 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,7 +41,9 @@ import org.mockito.Mockito; import com.github.eyefloaters.console.api.model.ListFetchParams; +import com.github.eyefloaters.console.api.service.MetricsService; import com.github.eyefloaters.console.api.support.ErrorCategory; +import com.github.eyefloaters.console.api.support.PrometheusAPI; import com.github.eyefloaters.console.kafka.systemtest.TestPlainProfile; import com.github.eyefloaters.console.kafka.systemtest.deployment.DeploymentManager; import com.github.eyefloaters.console.test.AdminClientSpy; @@ -61,6 +68,7 @@ import static java.util.Objects.isNull; import static java.util.function.Predicate.not; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -69,6 +77,11 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; @QuarkusTest @QuarkusTestResource(KubernetesServerTestResource.class) @@ -113,15 +126,29 @@ void setup() throws IOException { client.resources(Kafka.class).delete(); client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers, - new KafkaListenerAuthenticationCustomBuilder() - .withSasl() - .addToListenerConfig("sasl.enabled.mechanisms", "oauthbearer") - .build())) + .resource(new KafkaBuilder(utils.buildKafkaResource("test-kafka1", clusterId1, bootstrapServers, + new KafkaListenerAuthenticationCustomBuilder() + .withSasl() + .addToListenerConfig("sasl.enabled.mechanisms", "oauthbearer") + .build())) + .editOrNewStatus() + .addNewCondition() + .withType("Ready") + .withStatus("True") + .endCondition() + .endStatus() + .build()) .create(); // Second cluster is offline/non-existent client.resources(Kafka.class) - .resource(utils.buildKafkaResource("test-kafka2", clusterId2, randomBootstrapServers)) + .resource(new KafkaBuilder(utils.buildKafkaResource("test-kafka2", clusterId2, randomBootstrapServers)) + .editOrNewStatus() + .addNewCondition() + .withType("NotReady") + .withStatus("True") + .endCondition() + .endStatus() + .build()) .create(); // Wait for the informer cache to be populated with all Kafka CRs @@ -153,7 +180,9 @@ void testListClusters() { .body("data.attributes.bootstrapServers", containsInAnyOrder( bootstrapServers.getHost() + ":" + bootstrapServers.getPort(), randomBootstrapServers.getHost() + ":" + randomBootstrapServers.getPort())) - .body("data.attributes.authType", containsInAnyOrder(equalTo("custom"), nullValue())); + .body("data.attributes.authType", containsInAnyOrder(equalTo("custom"), nullValue())) + .body("data.find { it.attributes.name == 'test-kafka1'}.attributes.status", is("Ready")) + .body("data.find { it.attributes.name == 'test-kafka2'}.attributes.status", is("NotReady")); } @Test @@ -450,16 +479,7 @@ void testListClustersWithUnexpectedPageCursorData() { @Test void testDescribeClusterWithCustomOAuth() { - Map clientConfig = new HashMap<>(); - - AdminClientSpy.install(config -> { - clientConfig.putAll(config); - - Map newConfig = new HashMap<>(config); - // Disable SASL since the Kafka cluster is not actually using it - newConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - return newConfig; - }, client -> { /* No-op */ }); + Map clientConfig = mockAdminClient(); whenRequesting(req -> req .auth() @@ -501,16 +521,7 @@ void testDescribeClusterWithOAuthAndCertificates() { client.resources(Kafka.class).resource(kafka).create(); - Map clientConfig = new HashMap<>(); - - AdminClientSpy.install(config -> { - clientConfig.putAll(config); - - Map newConfig = new HashMap<>(config); - // Disable SASL since the Kafka cluster is not actually using it - newConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); - return newConfig; - }, client -> { /* No-op */ }); + Map clientConfig = mockAdminClient(); whenRequesting(req -> req .auth() @@ -581,4 +592,148 @@ void testDescribeClusterWithNoSuchCluster() { .body("errors.status", contains("404")) .body("errors.code", contains("4041")); } + + @Test + void testDescribeClusterWithMetrics() { + mockAdminClient(); // Allow Oauth HTTP requests to drop auth in back-end + + JsonObject mockValues = Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("result", Json.createArrayBuilder() + .add(Json.createObjectBuilder() + .add("metric", Json.createObjectBuilder() + .add(MetricsService.METRIC_NAME, "test_value_metric") + .add("custom_attribute", "custom_attribute_value")) + .add("value", Json.createArrayBuilder() + .add(0.999) + .add("999"))))) + .build(); + JsonObject mockRanges = Json.createObjectBuilder() + .add("data", Json.createObjectBuilder() + .add("result", Json.createArrayBuilder() + .add(Json.createObjectBuilder() + .add("metric", Json.createObjectBuilder() + .add(MetricsService.METRIC_NAME, "test_range_metric") + .add("custom_attribute", "custom_attribute_value")) + .add("values", Json.createArrayBuilder() + .add(Json.createArrayBuilder() + .add(0.123) + .add("0")) + .add(Json.createArrayBuilder() + .add(1.456) + .add("1")) + .add(Json.createArrayBuilder() + .add(2.789) + .add("2")) + )))) + .build(); + + PrometheusAPI prometheusMock = Mockito.mock(PrometheusAPI.class); + doReturn(mockValues).when(prometheusMock).query(anyString(), anyString()); + doReturn(mockRanges).when(prometheusMock).queryRange(anyString(), anyString(), anyString(), anyString()); + QuarkusMock.installMockForType(prometheusMock, PrometheusAPI.class, RestClient.LITERAL); + + whenRequesting(req -> req + .auth() + .oauth2("my-access-token") + .queryParam("fields[kafkas]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.id", is(clusterId1)) + .body("data.attributes.name", is("test-kafka1")) + .body("data.attributes.metrics", allOf(hasKey("values"), hasKey("ranges"))) + .body("data.attributes.metrics.values.test_value_metric[0].custom_attribute", is("custom_attribute_value")) + .body("data.attributes.metrics.values.test_value_metric[0].value", is("999")) + .body("data.attributes.metrics.ranges.test_range_metric[0].custom_attribute", is("custom_attribute_value")) + .body("data.attributes.metrics.ranges.test_range_metric[0].range", contains( + contains(Instant.ofEpochMilli(123).toString(), "0"), + contains(Instant.ofEpochMilli(1456).toString(), "1"), + contains(Instant.ofEpochMilli(2789).toString(), "2"))); + } + + @Test + void testDescribeClusterWithMetricsErrors() { + mockAdminClient(); // Allow Oauth HTTP requests to drop auth in back-end + + PrometheusAPI prometheusMock = Mockito.mock(PrometheusAPI.class); + + doThrow(new InternalServerErrorException(Response.serverError() + .entity("EXPECTED TEST EXCEPTION - METRICS HTTP 500 ERROR").build())) + .when(prometheusMock).query(anyString(), anyString()); + + doThrow(new RuntimeException("EXPECTED TEST EXCEPTION - METRICS RUNTIME ERROR")) + .when(prometheusMock).queryRange(anyString(), anyString(), anyString(), anyString()); + + QuarkusMock.installMockForType(prometheusMock, PrometheusAPI.class, RestClient.LITERAL); + + whenRequesting(req -> req + .auth() + .oauth2("my-access-token") + .queryParam("fields[kafkas]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.id", is(clusterId1)) + .body("data.attributes.name", is("test-kafka1")) + .body("data.attributes.metrics", allOf(hasKey("values"), hasKey("ranges"))) + .body("data.attributes.metrics.values", is(Matchers.anEmptyMap())) + .body("data.attributes.metrics.ranges", is(Matchers.anEmptyMap())); + + verify(prometheusMock, times(1)).query(anyString(), anyString()); + verify(prometheusMock, times(1)).queryRange(anyString(), anyString(), anyString(), anyString()); + } + + @Test + void testDescribeClusterWithMetricsDisabled() { + mockAdminClient(); // Allow Oauth HTTP requests to drop auth in back-end + + PrometheusAPI prometheusMock = Mockito.spy(PrometheusAPI.class); + QuarkusMock.installMockForType(prometheusMock, PrometheusAPI.class, RestClient.LITERAL); + + String prometheusUrl = System.getProperty("console.metrics.prometheus-url"); + + try { + System.setProperty("console.metrics.prometheus-url", ""); + + whenRequesting(req -> req + .auth() + .oauth2("my-access-token") + .queryParam("fields[kafkas]", "name,metrics") + .get("{clusterId}", clusterId1)) + .assertThat() + .statusCode(is(Status.OK.getStatusCode())) + .body("data.id", is(clusterId1)) + .body("data.attributes.name", is("test-kafka1")) + .body("data.attributes.metrics", allOf(hasKey("values"), hasKey("ranges"))) + .body("data.attributes.metrics.values", is(Matchers.anEmptyMap())) + .body("data.attributes.metrics.ranges", is(Matchers.anEmptyMap())); + } finally { + if (prometheusUrl != null) { + System.setProperty("console.metrics.prometheus-url", prometheusUrl); + } else { + System.clearProperty("console.metrics.prometheus-url"); + } + } + + verify(prometheusMock, times(0)).query(anyString(), anyString()); + verify(prometheusMock, times(0)).queryRange(anyString(), anyString(), anyString(), anyString()); + } + + // Helper methods + + static Map mockAdminClient() { + Map clientConfig = new HashMap<>(); + + AdminClientSpy.install(config -> { + clientConfig.putAll(config); + + Map newConfig = new HashMap<>(config); + // Disable SASL since the Kafka cluster is not actually using it + newConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.PLAINTEXT.name); + return newConfig; + }, client -> { /* No-op */ }); + + return clientConfig; + } }