Skip to content

Commit

Permalink
API: Show GPU stats on the node utilization page (#3608)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekazachkova authored Jul 23, 2024
1 parent c95ef66 commit 1cec0a4
Show file tree
Hide file tree
Showing 47 changed files with 2,291 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.epam.pipeline.entity.cluster.PodDescription;
import com.epam.pipeline.entity.cluster.PodInstance;
import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats;
import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity;
import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats;
import com.epam.pipeline.entity.pipeline.run.RunInfo;
import com.epam.pipeline.manager.cluster.EdgeServiceManager;
import com.epam.pipeline.manager.cluster.InstanceOfferManager;
Expand Down Expand Up @@ -102,6 +104,15 @@ public List<MonitoringStats> getStatsForNode(final String name,
return usageMonitoringManager.getStatsForNode(name, from, to);
}

@PreAuthorize(NODE_USAGE_READ)
public GpuMonitoringStats getGpuStatsForNode(final String name,
final LocalDateTime from,
final LocalDateTime to,
final List<GpuMetricsGranularity> granularity,
final boolean squashCharts) {
return usageMonitoringManager.getGpuStatsForNode(name, from, to, granularity, squashCharts);
}

@PreAuthorize(NODE_USAGE_READ)
public InputStream getUsageStatisticsFile(final String name, final LocalDateTime from, final LocalDateTime to,
final Duration interval, final MonitoringReportType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.epam.pipeline.entity.cluster.PodInstance;
import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats;
import com.epam.pipeline.acl.cluster.ClusterApiService;
import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity;
import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats;
import com.epam.pipeline.entity.pipeline.run.RunInfo;
import com.epam.pipeline.manager.cluster.MonitoringReportType;
import io.swagger.annotations.Api;
Expand Down Expand Up @@ -67,6 +69,7 @@ public class ClusterController extends AbstractRestController {
private static final String TO = "to";
private static final String INTERVAL = "interval";
private static final String REPORT_TYPE = "type";
private static final String FALSE = "false";
private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String REPORT_NAME_TEMPLATE = "%s_%s-%s-%s.%s";
private static final char TIME_SEPARATION_CHAR = ':';
Expand Down Expand Up @@ -197,7 +200,7 @@ public Result<NodeInstance> terminateNode(@PathVariable(value = NAME) final Stri
})
public Result<List<InstanceType>> loadAllInstanceTypes(
@RequestParam(required = false) final Long regionId,
@RequestParam(required = false, defaultValue = "false") final boolean toolInstances,
@RequestParam(required = false, defaultValue = FALSE) final boolean toolInstances,
@RequestParam(required = false) final Boolean spot) {
return toolInstances
? Result.success(clusterApiService.getAllowedToolInstanceTypes(regionId, spot))
Expand Down Expand Up @@ -241,6 +244,27 @@ public Result<List<MonitoringStats>> getNodeUsageStatistics(
return Result.success(clusterApiService.getStatsForNode(name, from, to));
}

@GetMapping("/cluster/node/{name}/usage/gpus")
@ResponseBody
@ApiOperation(
value = "Returns GPU stats from instance by given IP address",
notes = "Returns GPU stats from instance by given IP address",
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)
})
public Result<GpuMonitoringStats> getNodeUsageGpuStatistics(
@PathVariable(value = NAME) final String name,
@DateTimeFormat(pattern = DATE_TIME_FORMAT)
@RequestParam(value = FROM, required = false) final LocalDateTime from,
@DateTimeFormat(pattern = DATE_TIME_FORMAT)
@RequestParam(value = TO, required = false) final LocalDateTime to,
@RequestParam final List<GpuMetricsGranularity> granularity,
@RequestParam(required = false, defaultValue = FALSE)
final boolean squashCharts) {
return Result.success(clusterApiService.getGpuStatsForNode(name, from, to, granularity, squashCharts));
}

@GetMapping("/cluster/node/{name}/usage/report")
@ResponseBody
@ApiOperation(
Expand Down Expand Up @@ -308,7 +332,7 @@ public Result<List<PodInstance>> loadCorePods() {
produces = MediaType.APPLICATION_JSON_VALUE)
@ApiResponses(@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION))
public Result<PodDescription> loadPodDescription(@RequestParam final String podId,
@RequestParam(required = false, defaultValue = "false")
@RequestParam(required = false, defaultValue = FALSE)
final boolean detailed) {
return Result.success(clusterApiService.getPodDescription(podId, detailed));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.epam.pipeline.entity.utils.DateUtils;
import com.epam.pipeline.exception.PipelineException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -50,7 +51,8 @@
public class MonitoringESDao {
protected static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy.MM.dd");

private static final String INDEX_NAME_TOKEN = "heapster-";
private static final String HEAPSTER_INDEX_NAME_TOKEN = "heapster-";
private static final String GPU_STAT_INDEX_NAME_TOKEN = "cp-gpu-monitor-";

private HeapsterElasticRestHighLevelClient heapsterClient;
private RestClient lowLevelClient;
Expand Down Expand Up @@ -78,12 +80,14 @@ public void deleteIndices(final int retentionPeriodDays) {
try {
final Response response = requestIndices();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
final String indicesToDelete = indices(reader)
final String indicesToDelete = indices(reader, HEAPSTER_INDEX_NAME_TOKEN, GPU_STAT_INDEX_NAME_TOKEN)
.map(this::withParsedDate)
.filter(pair -> pair.right != null && olderThanRetentionPeriod(retentionPeriodDays, pair.right))
.map(pair -> pair.left)
.collect(Collectors.joining(","));

if (StringUtils.isBlank(indicesToDelete)) {
return;
}
lowLevelClient.performRequest(HttpMethod.DELETE.name(), "/" + indicesToDelete);
}
} catch (IOException e) {
Expand All @@ -95,10 +99,10 @@ private Response requestIndices() throws IOException {
return lowLevelClient.performRequest(HttpMethod.GET.name(), "/_cat/indices");
}

private Stream<String> indices(final BufferedReader reader) {
private Stream<String> indices(final BufferedReader reader, final String... indexNamePrefixes) {
return reader.lines()
.flatMap(l -> Arrays.stream(l.split(" ")))
.filter(str -> str.startsWith(INDEX_NAME_TOKEN));
.filter(str -> Arrays.stream(indexNamePrefixes).anyMatch(str::startsWith));
}

private ImmutablePair<String, LocalDateTime> withParsedDate(final String name) {
Expand All @@ -108,7 +112,13 @@ private ImmutablePair<String, LocalDateTime> withParsedDate(final String name) {
}

private Optional<LocalDateTime> toLocalDateTime(final String name) {
final String datetimeString = name.substring(INDEX_NAME_TOKEN.length());
if (!(name.startsWith(HEAPSTER_INDEX_NAME_TOKEN) || name.startsWith(GPU_STAT_INDEX_NAME_TOKEN))) {
return Optional.empty();
}
final int indexTokenLength = name.startsWith(HEAPSTER_INDEX_NAME_TOKEN)
? HEAPSTER_INDEX_NAME_TOKEN.length()
: GPU_STAT_INDEX_NAME_TOKEN.length();
final String datetimeString = name.substring(indexTokenLength);
try {
return Optional.of(LocalDate.parse(datetimeString, DATE_FORMATTER)
.atStartOfDay(ZoneOffset.UTC)
Expand All @@ -122,11 +132,11 @@ private boolean olderThanRetentionPeriod(final int retentionPeriod, final LocalD
return date.isBefore(DateUtils.nowUTC().minusDays(retentionPeriod + 1L));
}

public Optional<LocalDateTime> oldestIndexDate() {
public Optional<LocalDateTime> oldestIndexDate(final String... indexPrefixes) {
try {
final Response response = requestIndices();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) {
return indices(reader)
return indices(reader, indexPrefixes)
.map(this::toLocalDateTime)
.filter(Optional::isPresent)
.map(Optional::get)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.pipeline.dao.monitoring.metricrequester;

import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

public abstract class AbstractGPUMetricsRequester extends AbstractMetricRequester {
protected static final String GPU_DEVICE_NAME = "device_name";
protected static final String INDEX_NAME_PATTERN = "cp-gpu-monitor-%s";

AbstractGPUMetricsRequester(final HeapsterElasticRestHighLevelClient client) {
super(client, INDEX_NAME_PATTERN);
}

protected String getDeviceName(final SearchHits hits) {
return Optional.ofNullable(hits)
.map(SearchHits::getHits)
.map(Arrays::stream)
.orElseGet(Stream::empty)
.findFirst()
.map(SearchHit::getSourceAsMap)
.map(sourceMap -> sourceMap.get(FIELD_METRICS_TAGS))
.filter(metricsTags -> metricsTags instanceof Map)
.map(Map.class::cast)
.map(metricsTags -> metricsTags.get(GPU_DEVICE_NAME))
.map(String.class::cast)
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.search.aggregations.metrics.ParsedSingleValueNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;

Expand Down Expand Up @@ -81,6 +82,7 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito
protected static final String WORKING_SET = "working_set";
protected static final String CPU_CAPACITY = "cpu_capacity";
protected static final String CPU_UTILIZATION = "cpu_utilization";
protected static final String GPU_UTILIZATION = "gpu_utilization";
protected static final String MEMORY_UTILIZATION = "memory_utilization";
protected static final String MEMORY_CAPACITY = "memory_capacity";
protected static final String LIMIT = "limit";
Expand All @@ -101,6 +103,7 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito

protected static final String AVG_AGGREGATION = "avg_";
protected static final String MAX_AGGREGATION = "max_";
protected static final String MIN_AGGREGATION = "min_";
protected static final String DIVISION_AGGREGATION = "division_";
protected static final String AGGREGATION_POD_NAME = "pod_name";
protected static final String FIELD_POD_NAME_RAW = "pod_name.raw";
Expand All @@ -112,9 +115,17 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito
protected static final String SWAP_FILESYSTEM = "tmpfs";

private final HeapsterElasticRestHighLevelClient client;
private final String indexNamePattern;

AbstractMetricRequester(final HeapsterElasticRestHighLevelClient client) {
this.client = client;
this.indexNamePattern = INDEX_NAME_PATTERN;
}

AbstractMetricRequester(final HeapsterElasticRestHighLevelClient client,
final String indexNamePattern) {
this.client = client;
this.indexNamePattern = indexNamePattern;
}

protected abstract ELKUsageMetric metric();
Expand Down Expand Up @@ -199,13 +210,13 @@ protected SearchRequest request(final LocalDateTime from, final LocalDateTime to
.indicesOptions(INDICES_OPTIONS);
}

private static String[] getIndexNames(final LocalDateTime from, final LocalDateTime to) {
private String[] getIndexNames(final LocalDateTime from, final LocalDateTime to) {
final LocalDate fromDate = from.toLocalDate();
final LocalDate toDate = to.toLocalDate();
return Stream.iterate(fromDate, date -> date.plusDays(1))
.limit(Period.between(fromDate, toDate).getDays() + 1)
.map(date -> date.format(DATE_FORMATTER))
.map(str -> String.format(INDEX_NAME_PATTERN, str))
.map(str -> String.format(indexNamePattern, str))
.toArray(String[]::new);
}

Expand Down Expand Up @@ -250,12 +261,16 @@ protected DateHistogramAggregationBuilder dateHistogram(final String name, final
}

protected AvgAggregationBuilder average(final String name, final String field) {
return AggregationBuilders.avg(name)
return AggregationBuilders.avg(AVG_AGGREGATION + name)
.field(field(field));
}

protected MaxAggregationBuilder max(final String name, final String field) {
return AggregationBuilders.max(name).field(field(field));
return AggregationBuilders.max(MAX_AGGREGATION + name).field(field(field));
}

protected MinAggregationBuilder min(final String name, final String field) {
return AggregationBuilders.min(MIN_AGGREGATION + name).field(field(field));
}

protected PipelineAggregationBuilder division(final String name, final String divider, final String divisor) {
Expand Down Expand Up @@ -286,4 +301,8 @@ protected TermsAggregationBuilder ordered(final TermsAggregationBuilder terms) {
}
return terms;
}

protected Double getDoubleValue(final List<Aggregation> aggregations, final String metricName) {
return doubleValue(aggregations, metricName).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public SearchRequest buildRequest(final Collection<String> resourceIds, final Lo
.aggregation(ordered(AggregationBuilders.terms(AGGREGATION_NODE_NAME))
.field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW))
.size(resourceIds.size())
.subAggregation(average(AVG_AGGREGATION + USAGE_RATE, USAGE_RATE))));
.subAggregation(average(USAGE_RATE, USAGE_RATE))));
}

@Override
Expand All @@ -80,9 +80,9 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate
statsQuery(nodeName, NODE, from, to)
.size(0)
.aggregation(dateHistogram(CPU_HISTOGRAM, interval)
.subAggregation(average(AVG_AGGREGATION + CPU_UTILIZATION, NODE_UTILIZATION))
.subAggregation(max(MAX_AGGREGATION + CPU_UTILIZATION, NODE_UTILIZATION))
.subAggregation(average(AVG_AGGREGATION + CPU_CAPACITY, NODE_CAPACITY))));
.subAggregation(average(CPU_UTILIZATION, NODE_UTILIZATION))
.subAggregation(max(CPU_UTILIZATION, NODE_UTILIZATION))
.subAggregation(average(CPU_CAPACITY, NODE_CAPACITY))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public SearchRequest buildRequest(final Collection<String> resourceIds, final Lo
.field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW))
.subAggregation(ordered(AggregationBuilders.terms(AGGREGATION_DISK_NAME))
.field(path(FIELD_METRICS_TAGS, RESOURCE_ID))
.subAggregation(average(AVG_AGGREGATION + LIMIT, LIMIT))
.subAggregation(average(AVG_AGGREGATION + USAGE, USAGE)))));
.subAggregation(average(LIMIT, LIMIT))
.subAggregation(average(USAGE, USAGE)))));
}

@Override
Expand Down Expand Up @@ -112,8 +112,8 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate
.aggregation(ordered(AggregationBuilders.terms(AGGREGATION_DISK_NAME))
.field(path(FIELD_METRICS_TAGS, RESOURCE_ID))
.subAggregation(dateHistogram(DISKS_HISTOGRAM, interval)
.subAggregation(average(AVG_AGGREGATION + USAGE, USAGE))
.subAggregation(average(AVG_AGGREGATION + LIMIT, LIMIT)))));
.subAggregation(average(USAGE, USAGE))
.subAggregation(average(LIMIT, LIMIT)))));
}

@Override
Expand Down
Loading

0 comments on commit 1cec0a4

Please sign in to comment.