-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Metrics via API, per-cluster Prometheus Config, OCP monitoring support
Signed-off-by: Michael Edgar <[email protected]>
- Loading branch information
Showing
66 changed files
with
1,505 additions
and
950 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
api/src/main/java/com/github/streamshub/console/api/model/Metrics.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package com.github.streamshub.console.api.model; | ||
|
||
import java.time.Instant; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import org.eclipse.microprofile.openapi.annotations.media.Schema; | ||
|
||
import com.fasterxml.jackson.annotation.JsonAnyGetter; | ||
import com.fasterxml.jackson.annotation.JsonFormat; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.annotation.JsonPropertyOrder; | ||
|
||
public record Metrics( | ||
@JsonProperty | ||
Map<String, List<Metrics.ValueMetric>> values, | ||
|
||
@JsonProperty | ||
Map<String, List<Metrics.RangeMetric>> ranges) { | ||
|
||
public Metrics() { | ||
this(new LinkedHashMap<>(), new LinkedHashMap<>()); | ||
} | ||
|
||
@Schema(additionalProperties = String.class) | ||
public static record ValueMetric( | ||
@JsonProperty | ||
String value, | ||
|
||
@JsonAnyGetter | ||
@Schema(hidden = true) | ||
Map<String, String> attributes) { | ||
} | ||
|
||
@Schema(additionalProperties = String.class) | ||
public static record RangeMetric( | ||
@JsonProperty | ||
@Schema(implementation = String[][].class) | ||
List<RangeEntry> range, | ||
|
||
@JsonAnyGetter | ||
@Schema(hidden = true) | ||
Map<String, String> attributes) { | ||
} | ||
|
||
@JsonFormat(shape = JsonFormat.Shape.ARRAY) | ||
@JsonPropertyOrder({"when", "value"}) | ||
public static record RangeEntry(Instant when, String value) { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
187 changes: 187 additions & 0 deletions
187
api/src/main/java/com/github/streamshub/console/api/service/MetricsService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
package com.github.streamshub.console.api.service; | ||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
import java.time.Instant; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.Base64; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
import java.util.function.BiFunction; | ||
import java.util.function.Predicate; | ||
import java.util.function.Supplier; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
import jakarta.json.JsonArray; | ||
import jakarta.json.JsonObject; | ||
import jakarta.ws.rs.WebApplicationException; | ||
import jakarta.ws.rs.client.ClientRequestContext; | ||
import jakarta.ws.rs.client.ClientRequestFilter; | ||
import jakarta.ws.rs.core.HttpHeaders; | ||
|
||
import org.eclipse.microprofile.rest.client.RestClientBuilder; | ||
import org.jboss.logging.Logger; | ||
|
||
import com.github.streamshub.console.api.model.Metrics; | ||
import com.github.streamshub.console.api.model.Metrics.RangeEntry; | ||
import com.github.streamshub.console.api.support.KafkaContext; | ||
import com.github.streamshub.console.api.support.PrometheusAPI; | ||
import com.github.streamshub.console.config.ConsoleConfig; | ||
import com.github.streamshub.console.config.KafkaClusterConfig; | ||
import com.github.streamshub.console.config.PrometheusConfig; | ||
import com.github.streamshub.console.config.PrometheusConfig.Type; | ||
|
||
import io.fabric8.kubernetes.client.KubernetesClient; | ||
import io.quarkus.tls.TlsConfiguration; | ||
import io.quarkus.tls.TlsConfigurationRegistry; | ||
|
||
import static java.util.stream.Collectors.groupingBy; | ||
import static java.util.stream.Collectors.mapping; | ||
import static java.util.stream.Collectors.toList; | ||
import static java.util.stream.Collectors.toMap; | ||
|
||
@ApplicationScoped | ||
public class MetricsService { | ||
|
||
public static final String METRIC_NAME = "__console_metric_name__"; | ||
|
||
@Inject | ||
Logger logger; | ||
|
||
@Inject | ||
TlsConfigurationRegistry certificates; | ||
|
||
@Inject | ||
KubernetesClient k8s; | ||
|
||
@Inject | ||
KafkaContext kafkaContext; | ||
|
||
ClientRequestFilter createAuthenticationFilter(PrometheusConfig config) { | ||
return new ClientRequestFilter() { | ||
@Override | ||
public void filter(ClientRequestContext requestContext) throws IOException { | ||
var authConfig = config.getAuthentication(); | ||
String authHeader = null; | ||
|
||
if (authConfig instanceof PrometheusConfig.Basic basic) { | ||
authHeader = "Basic " + Base64.getEncoder().encodeToString("%s:%s".formatted( | ||
basic.getUsername(), | ||
basic.getPassword()) | ||
.getBytes()); | ||
} else if (authConfig instanceof PrometheusConfig.Bearer bearer) { | ||
authHeader = "Bearer " + bearer.getToken(); | ||
} else if (config.getType() == Type.OPENSHIFT_MONITORING) { | ||
// ServiceAccount needs cluster role `cluster-monitoring-view` | ||
authHeader = "Bearer " + k8s.getConfiguration().getAutoOAuthToken(); | ||
} | ||
|
||
if (authHeader != null) { | ||
requestContext.getHeaders().add(HttpHeaders.AUTHORIZATION, authHeader); | ||
} | ||
} | ||
}; | ||
} | ||
|
||
public PrometheusAPI createClient(ConsoleConfig consoleConfig, KafkaClusterConfig clusterConfig) { | ||
PrometheusConfig prometheusConfig; | ||
|
||
if (clusterConfig.getMetricsSource() != null) { | ||
prometheusConfig = consoleConfig.getMetricsSources() | ||
.stream() | ||
.filter(source -> source.getName().equals(clusterConfig.getMetricsSource())) | ||
.findFirst() | ||
.orElseThrow(); | ||
|
||
var trustStore = certificates.getDefault().map(TlsConfiguration::getTrustStore).orElse(null); | ||
|
||
return RestClientBuilder.newBuilder() | ||
.baseUri(URI.create(prometheusConfig.getUrl())) | ||
.trustStore(trustStore) | ||
.register(createAuthenticationFilter(prometheusConfig)) | ||
.build(PrometheusAPI.class); | ||
} | ||
|
||
return null; | ||
} | ||
|
||
CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query) { | ||
PrometheusAPI prometheusAPI = kafkaContext.prometheus(); | ||
|
||
return fetchMetrics( | ||
() -> prometheusAPI.query(query, Instant.now()), | ||
(metric, attributes) -> { | ||
// ignore timestamp in first position | ||
String value = metric.getJsonArray("value").getString(1); | ||
return new Metrics.ValueMetric(value, attributes); | ||
}); | ||
} | ||
|
||
CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query) { | ||
PrometheusAPI prometheusAPI = kafkaContext.prometheus(); | ||
|
||
return fetchMetrics( | ||
() -> { | ||
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); | ||
Instant start = now.minus(30, ChronoUnit.MINUTES); | ||
Instant end = now; | ||
return prometheusAPI.queryRange(query, start, end, "25"); | ||
}, | ||
(metric, attributes) -> { | ||
List<RangeEntry> values = metric.getJsonArray("values") | ||
.stream() | ||
.map(JsonArray.class::cast) | ||
.map(e -> new Metrics.RangeEntry( | ||
Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)), | ||
e.getString(1) | ||
)) | ||
.toList(); | ||
|
||
return new Metrics.RangeMetric(values, attributes); | ||
}); | ||
} | ||
|
||
<M> CompletionStage<Map<String, List<M>>> fetchMetrics( | ||
Supplier<JsonObject> operation, | ||
BiFunction<JsonObject, Map<String, String>, M> builder) { | ||
|
||
return CompletableFuture.supplyAsync(() -> { | ||
try { | ||
return extractMetrics(operation.get(), builder); | ||
} catch (WebApplicationException wae) { | ||
logger.warnf("Failed to retrieve Kafka cluster metrics, status %d: %s", | ||
wae.getResponse().getStatus(), | ||
wae.getResponse().getEntity()); | ||
return Collections.emptyMap(); | ||
} catch (Exception e) { | ||
logger.warnf(e, "Failed to retrieve Kafka cluster metrics"); | ||
return Collections.emptyMap(); | ||
} | ||
}); | ||
} | ||
|
||
<M> Map<String, List<M>> extractMetrics(JsonObject response, | ||
BiFunction<JsonObject, Map<String, String>, M> builder) { | ||
|
||
return response.getJsonObject("data").getJsonArray("result") | ||
.stream() | ||
.map(JsonObject.class::cast) | ||
.map(metric -> { | ||
JsonObject meta = metric.getJsonObject("metric"); | ||
String metricName = meta.getString(METRIC_NAME); | ||
|
||
Map<String, String> attributes = meta.keySet() | ||
.stream() | ||
.filter(Predicate.not(METRIC_NAME::equals)) | ||
.map(key -> Map.entry(key, meta.getString(key))) | ||
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
|
||
return Map.entry(metricName, builder.apply(metric, attributes)); | ||
}) | ||
.collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList()))); | ||
} | ||
} |
Oops, something went wrong.