Skip to content

Commit

Permalink
Re-factor out metric retrieval duplication
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar committed Nov 27, 2023
1 parent 8a33372 commit e79c05b
Showing 1 changed file with 73 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -15,8 +16,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -31,6 +32,11 @@
import com.github.eyefloaters.console.api.model.Metrics;
import com.github.eyefloaters.console.api.model.Metrics.RangeEntry;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

@ApplicationScoped
public class MetricsService {

Expand All @@ -48,117 +54,88 @@ public boolean isDisabled() {
}

CompletionStage<Map<String, List<Metrics.ValueMetric>>> queryValues(String query) {
HttpClient client = HttpClient.newBuilder()
.build();

Instant now = Instant.now();
String time = Double.toString(now.toEpochMilli() / 1000d);

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(prometheusUrl.get() + "/api/v1/query"))
.header("Content-type", "application/x-www-form-urlencoded")
.POST(HttpRequest.BodyPublishers.ofString(
"query=" + URLEncoder.encode(query, StandardCharsets.UTF_8) + "&" +
"time=" + time))
.build();

return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(response -> {
if (response.statusCode() != HttpURLConnection.HTTP_OK) {
logger.warnf("Failed to retrieve Kafka cluster metrics: %s", response.body());
return Collections.emptyMap();
}

JsonObject metrics;

try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
metrics = reader.readObject();
}

return metrics.getJsonObject("data")
.getJsonArray("result")
.stream()
.map(JsonObject.class::cast)
.map(entry -> {
JsonObject metric = entry.getJsonObject("metric");
String metricName = metric.getString(METRIC_NAME);

Map<String, String> attributes = metric.keySet()
.stream()
.filter(Predicate.not(METRIC_NAME::equals))
.map(key -> Map.entry(key, metric.getString(key)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

String value = entry.getJsonArray("value").getString(1);

return Map.entry(metricName, new Metrics.ValueMetric(value, attributes));
})
.collect(Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
});
BodyPublisher body = HttpRequest.BodyPublishers.ofString(
"query=" + URLEncoder.encode(query, StandardCharsets.UTF_8) + "&" +
"time=" + time);

return fetchMetrics("query", body, (metric, attributes) -> {
String value = metric.getJsonArray("value").getString(1); // ignore timestamp in first position
return new Metrics.ValueMetric(value, attributes);
});
}

CompletionStage<Map<String, List<Metrics.RangeMetric>>> queryRanges(String query) {
HttpClient client = HttpClient.newBuilder()
.build();

Instant now = Instant.now();

String start = Double.toString(now.minus(30, ChronoUnit.MINUTES).toEpochMilli() / 1000d);
String end = Double.toString(now.toEpochMilli() / 1000d);
BodyPublisher body = HttpRequest.BodyPublishers.ofString(
"query=" + URLEncoder.encode(query, StandardCharsets.UTF_8) + "&" +
"start=" + start + "&" +
"end=" + end + "&" +
"step=60");

return fetchMetrics("query_range", body, (metric, attributes) -> {
List<RangeEntry> values = metric.getJsonArray("values")
.stream()
.map(JsonArray.class::cast)
.map(e -> new Metrics.RangeEntry(
Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)),
e.getString(1)
))
.toList();

return new Metrics.RangeMetric(values, attributes);
});
}

<M> CompletionStage<Map<String, List<M>>> fetchMetrics(String path,
BodyPublisher body,
BiFunction<JsonObject, Map<String, String>, M> builder) {

HttpClient client = HttpClient.newBuilder()
.build();

HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(prometheusUrl.get() + "/api/v1/query_range"))
.uri(URI.create(prometheusUrl.get() + "/api/v1/" + path))
.header("Content-type", "application/x-www-form-urlencoded")
.POST(HttpRequest.BodyPublishers.ofString(
"query=" + URLEncoder.encode(query, StandardCharsets.UTF_8) + "&" +
"start=" + start + "&" +
"end=" + end + "&" +
"step=60"))
.POST(body)
.build();

return client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(response -> {
if (response.statusCode() != HttpURLConnection.HTTP_OK) {
logger.warnf("Failed to retrieve Kafka cluster metrics: %s", response.body());
return Collections.emptyMap();
}
.thenApply(response -> extractMetrics(response, builder));
}

JsonObject metrics;
<M> Map<String, List<M>> extractMetrics(HttpResponse<String> response,
BiFunction<JsonObject, Map<String, String>, M> builder) {

try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
metrics = reader.readObject();
}
if (response.statusCode() != HttpURLConnection.HTTP_OK) {
logger.warnf("Failed to retrieve Kafka cluster metrics: %s", response.body());
return Collections.emptyMap();
}

return metrics.getJsonObject("data")
.getJsonArray("result")
.stream()
.map(JsonObject.class::cast)
.map(entry -> {
JsonObject metric = entry.getJsonObject("metric");
String metricName = metric.getString(METRIC_NAME);

Map<String, String> attributes = metric.keySet()
.stream()
.filter(Predicate.not(METRIC_NAME::equals))
.map(key -> Map.entry(key, metric.getString(key)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

List<RangeEntry> values = entry.getJsonArray("values")
.stream()
.map(JsonArray.class::cast)
.map(e -> new Metrics.RangeEntry(
Instant.ofEpochMilli((long) (e.getJsonNumber(0).doubleValue() * 1000d)),
e.getString(1)
))
.toList();

return Map.entry(metricName, new Metrics.RangeMetric(values, attributes));
})
.collect(Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toList())));
});
JsonObject metrics;

try (JsonReader reader = Json.createReader(new StringReader(response.body()))) {
metrics = reader.readObject();
}

return metrics.getJsonObject("data").getJsonArray("result")
.stream()
.map(JsonObject.class::cast)
.map(metric -> {
JsonObject meta = metric.getJsonObject("metric");
String metricName = meta.getString(METRIC_NAME);

Map<String, String> attributes = meta.keySet()
.stream()
.filter(Predicate.not(METRIC_NAME::equals))
.map(key -> Map.entry(key, meta.getString(key)))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));

return Map.entry(metricName, builder.apply(metric, attributes));
})
.collect(groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toList())));
}
}

0 comments on commit e79c05b

Please sign in to comment.