diff --git a/api/src/main/java/com/epam/pipeline/acl/cluster/ClusterApiService.java b/api/src/main/java/com/epam/pipeline/acl/cluster/ClusterApiService.java index baca1f7c84..b859a71d48 100644 --- a/api/src/main/java/com/epam/pipeline/acl/cluster/ClusterApiService.java +++ b/api/src/main/java/com/epam/pipeline/acl/cluster/ClusterApiService.java @@ -31,6 +31,8 @@ import com.epam.pipeline.entity.cluster.PodDescription; import com.epam.pipeline.entity.cluster.PodInstance; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats; import com.epam.pipeline.entity.pipeline.run.RunInfo; import com.epam.pipeline.manager.cluster.EdgeServiceManager; import com.epam.pipeline.manager.cluster.InstanceOfferManager; @@ -102,6 +104,15 @@ public List getStatsForNode(final String name, return usageMonitoringManager.getStatsForNode(name, from, to); } + @PreAuthorize(NODE_USAGE_READ) + public GpuMonitoringStats getGpuStatsForNode(final String name, + final LocalDateTime from, + final LocalDateTime to, + final List granularity, + final boolean squashCharts) { + return usageMonitoringManager.getGpuStatsForNode(name, from, to, granularity, squashCharts); + } + @PreAuthorize(NODE_USAGE_READ) public InputStream getUsageStatisticsFile(final String name, final LocalDateTime from, final LocalDateTime to, final Duration interval, final MonitoringReportType type) { diff --git a/api/src/main/java/com/epam/pipeline/controller/cluster/ClusterController.java b/api/src/main/java/com/epam/pipeline/controller/cluster/ClusterController.java index 23cf333eb8..f478680db8 100644 --- a/api/src/main/java/com/epam/pipeline/controller/cluster/ClusterController.java +++ b/api/src/main/java/com/epam/pipeline/controller/cluster/ClusterController.java @@ -31,6 +31,8 @@ import com.epam.pipeline.entity.cluster.PodInstance; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.acl.cluster.ClusterApiService; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats; import com.epam.pipeline.entity.pipeline.run.RunInfo; import com.epam.pipeline.manager.cluster.MonitoringReportType; import io.swagger.annotations.Api; @@ -67,6 +69,7 @@ public class ClusterController extends AbstractRestController { private static final String TO = "to"; private static final String INTERVAL = "interval"; private static final String REPORT_TYPE = "type"; + private static final String FALSE = "false"; private static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; private static final String REPORT_NAME_TEMPLATE = "%s_%s-%s-%s.%s"; private static final char TIME_SEPARATION_CHAR = ':'; @@ -197,7 +200,7 @@ public Result terminateNode(@PathVariable(value = NAME) final Stri }) public Result> loadAllInstanceTypes( @RequestParam(required = false) final Long regionId, - @RequestParam(required = false, defaultValue = "false") final boolean toolInstances, + @RequestParam(required = false, defaultValue = FALSE) final boolean toolInstances, @RequestParam(required = false) final Boolean spot) { return toolInstances ? Result.success(clusterApiService.getAllowedToolInstanceTypes(regionId, spot)) @@ -241,6 +244,27 @@ public Result> getNodeUsageStatistics( return Result.success(clusterApiService.getStatsForNode(name, from, to)); } + @GetMapping("/cluster/node/{name}/usage/gpus") + @ResponseBody + @ApiOperation( + value = "Returns GPU stats from instance by given IP address", + notes = "Returns GPU stats from instance by given IP address", + produces = MediaType.APPLICATION_JSON_VALUE) + @ApiResponses( + value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION) + }) + public Result getNodeUsageGpuStatistics( + @PathVariable(value = NAME) final String name, + @DateTimeFormat(pattern = DATE_TIME_FORMAT) + @RequestParam(value = FROM, required = false) final LocalDateTime from, + @DateTimeFormat(pattern = DATE_TIME_FORMAT) + @RequestParam(value = TO, required = false) final LocalDateTime to, + @RequestParam final List granularity, + @RequestParam(required = false, defaultValue = FALSE) + final boolean squashCharts) { + return Result.success(clusterApiService.getGpuStatsForNode(name, from, to, granularity, squashCharts)); + } + @GetMapping("/cluster/node/{name}/usage/report") @ResponseBody @ApiOperation( @@ -308,7 +332,7 @@ public Result> loadCorePods() { produces = MediaType.APPLICATION_JSON_VALUE) @ApiResponses(@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)) public Result loadPodDescription(@RequestParam final String podId, - @RequestParam(required = false, defaultValue = "false") + @RequestParam(required = false, defaultValue = FALSE) final boolean detailed) { return Result.success(clusterApiService.getPodDescription(podId, detailed)); } diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/MonitoringESDao.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/MonitoringESDao.java index 09a1aadac5..0f263e17c0 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/MonitoringESDao.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/MonitoringESDao.java @@ -22,6 +22,7 @@ import com.epam.pipeline.entity.utils.DateUtils; import com.epam.pipeline.exception.PipelineException; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -50,7 +51,8 @@ public class MonitoringESDao { protected static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy.MM.dd"); - private static final String INDEX_NAME_TOKEN = "heapster-"; + private static final String HEAPSTER_INDEX_NAME_TOKEN = "heapster-"; + private static final String GPU_STAT_INDEX_NAME_TOKEN = "cp-gpu-monitor-"; private HeapsterElasticRestHighLevelClient heapsterClient; private RestClient lowLevelClient; @@ -78,12 +80,14 @@ public void deleteIndices(final int retentionPeriodDays) { try { final Response response = requestIndices(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { - final String indicesToDelete = indices(reader) + final String indicesToDelete = indices(reader, HEAPSTER_INDEX_NAME_TOKEN, GPU_STAT_INDEX_NAME_TOKEN) .map(this::withParsedDate) .filter(pair -> pair.right != null && olderThanRetentionPeriod(retentionPeriodDays, pair.right)) .map(pair -> pair.left) .collect(Collectors.joining(",")); - + if (StringUtils.isBlank(indicesToDelete)) { + return; + } lowLevelClient.performRequest(HttpMethod.DELETE.name(), "/" + indicesToDelete); } } catch (IOException e) { @@ -95,10 +99,10 @@ private Response requestIndices() throws IOException { return lowLevelClient.performRequest(HttpMethod.GET.name(), "/_cat/indices"); } - private Stream indices(final BufferedReader reader) { + private Stream indices(final BufferedReader reader, final String... indexNamePrefixes) { return reader.lines() .flatMap(l -> Arrays.stream(l.split(" "))) - .filter(str -> str.startsWith(INDEX_NAME_TOKEN)); + .filter(str -> Arrays.stream(indexNamePrefixes).anyMatch(str::startsWith)); } private ImmutablePair withParsedDate(final String name) { @@ -108,7 +112,13 @@ private ImmutablePair withParsedDate(final String name) { } private Optional toLocalDateTime(final String name) { - final String datetimeString = name.substring(INDEX_NAME_TOKEN.length()); + if (!(name.startsWith(HEAPSTER_INDEX_NAME_TOKEN) || name.startsWith(GPU_STAT_INDEX_NAME_TOKEN))) { + return Optional.empty(); + } + final int indexTokenLength = name.startsWith(HEAPSTER_INDEX_NAME_TOKEN) + ? HEAPSTER_INDEX_NAME_TOKEN.length() + : GPU_STAT_INDEX_NAME_TOKEN.length(); + final String datetimeString = name.substring(indexTokenLength); try { return Optional.of(LocalDate.parse(datetimeString, DATE_FORMATTER) .atStartOfDay(ZoneOffset.UTC) @@ -122,11 +132,11 @@ private boolean olderThanRetentionPeriod(final int retentionPeriod, final LocalD return date.isBefore(DateUtils.nowUTC().minusDays(retentionPeriod + 1L)); } - public Optional oldestIndexDate() { + public Optional oldestIndexDate(final String... indexPrefixes) { try { final Response response = requestIndices(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()))) { - return indices(reader) + return indices(reader, indexPrefixes) .map(this::toLocalDateTime) .filter(Optional::isPresent) .map(Optional::get) diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractGPUMetricsRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractGPUMetricsRequester.java new file mode 100644 index 0000000000..4a627f7e17 --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractGPUMetricsRequester.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.dao.monitoring.metricrequester; + +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +public abstract class AbstractGPUMetricsRequester extends AbstractMetricRequester { + protected static final String GPU_DEVICE_NAME = "device_name"; + protected static final String INDEX_NAME_PATTERN = "cp-gpu-monitor-%s"; + + AbstractGPUMetricsRequester(final HeapsterElasticRestHighLevelClient client) { + super(client, INDEX_NAME_PATTERN); + } + + protected String getDeviceName(final SearchHits hits) { + return Optional.ofNullable(hits) + .map(SearchHits::getHits) + .map(Arrays::stream) + .orElseGet(Stream::empty) + .findFirst() + .map(SearchHit::getSourceAsMap) + .map(sourceMap -> sourceMap.get(FIELD_METRICS_TAGS)) + .filter(metricsTags -> metricsTags instanceof Map) + .map(Map.class::cast) + .map(metricsTags -> metricsTags.get(GPU_DEVICE_NAME)) + .map(String.class::cast) + .orElse(null); + } +} diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java index a336ef1722..abcf66e764 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/AbstractMetricRequester.java @@ -38,6 +38,7 @@ import org.elasticsearch.search.aggregations.metrics.ParsedSingleValueNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -81,6 +82,7 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito protected static final String WORKING_SET = "working_set"; protected static final String CPU_CAPACITY = "cpu_capacity"; protected static final String CPU_UTILIZATION = "cpu_utilization"; + protected static final String GPU_UTILIZATION = "gpu_utilization"; protected static final String MEMORY_UTILIZATION = "memory_utilization"; protected static final String MEMORY_CAPACITY = "memory_capacity"; protected static final String LIMIT = "limit"; @@ -101,6 +103,7 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito protected static final String AVG_AGGREGATION = "avg_"; protected static final String MAX_AGGREGATION = "max_"; + protected static final String MIN_AGGREGATION = "min_"; protected static final String DIVISION_AGGREGATION = "division_"; protected static final String AGGREGATION_POD_NAME = "pod_name"; protected static final String FIELD_POD_NAME_RAW = "pod_name.raw"; @@ -112,9 +115,17 @@ public abstract class AbstractMetricRequester implements MetricRequester, Monito protected static final String SWAP_FILESYSTEM = "tmpfs"; private final HeapsterElasticRestHighLevelClient client; + private final String indexNamePattern; AbstractMetricRequester(final HeapsterElasticRestHighLevelClient client) { this.client = client; + this.indexNamePattern = INDEX_NAME_PATTERN; + } + + AbstractMetricRequester(final HeapsterElasticRestHighLevelClient client, + final String indexNamePattern) { + this.client = client; + this.indexNamePattern = indexNamePattern; } protected abstract ELKUsageMetric metric(); @@ -199,13 +210,13 @@ protected SearchRequest request(final LocalDateTime from, final LocalDateTime to .indicesOptions(INDICES_OPTIONS); } - private static String[] getIndexNames(final LocalDateTime from, final LocalDateTime to) { + private String[] getIndexNames(final LocalDateTime from, final LocalDateTime to) { final LocalDate fromDate = from.toLocalDate(); final LocalDate toDate = to.toLocalDate(); return Stream.iterate(fromDate, date -> date.plusDays(1)) .limit(Period.between(fromDate, toDate).getDays() + 1) .map(date -> date.format(DATE_FORMATTER)) - .map(str -> String.format(INDEX_NAME_PATTERN, str)) + .map(str -> String.format(indexNamePattern, str)) .toArray(String[]::new); } @@ -250,12 +261,16 @@ protected DateHistogramAggregationBuilder dateHistogram(final String name, final } protected AvgAggregationBuilder average(final String name, final String field) { - return AggregationBuilders.avg(name) + return AggregationBuilders.avg(AVG_AGGREGATION + name) .field(field(field)); } protected MaxAggregationBuilder max(final String name, final String field) { - return AggregationBuilders.max(name).field(field(field)); + return AggregationBuilders.max(MAX_AGGREGATION + name).field(field(field)); + } + + protected MinAggregationBuilder min(final String name, final String field) { + return AggregationBuilders.min(MIN_AGGREGATION + name).field(field(field)); } protected PipelineAggregationBuilder division(final String name, final String divider, final String divisor) { @@ -286,4 +301,8 @@ protected TermsAggregationBuilder ordered(final TermsAggregationBuilder terms) { } return terms; } + + protected Double getDoubleValue(final List aggregations, final String metricName) { + return doubleValue(aggregations, metricName).orElse(null); + } } diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/CPURequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/CPURequester.java index b8f1dd1862..0d7ec8a9e5 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/CPURequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/CPURequester.java @@ -64,7 +64,7 @@ public SearchRequest buildRequest(final Collection resourceIds, final Lo .aggregation(ordered(AggregationBuilders.terms(AGGREGATION_NODE_NAME)) .field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW)) .size(resourceIds.size()) - .subAggregation(average(AVG_AGGREGATION + USAGE_RATE, USAGE_RATE)))); + .subAggregation(average(USAGE_RATE, USAGE_RATE)))); } @Override @@ -80,9 +80,9 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate statsQuery(nodeName, NODE, from, to) .size(0) .aggregation(dateHistogram(CPU_HISTOGRAM, interval) - .subAggregation(average(AVG_AGGREGATION + CPU_UTILIZATION, NODE_UTILIZATION)) - .subAggregation(max(MAX_AGGREGATION + CPU_UTILIZATION, NODE_UTILIZATION)) - .subAggregation(average(AVG_AGGREGATION + CPU_CAPACITY, NODE_CAPACITY)))); + .subAggregation(average(CPU_UTILIZATION, NODE_UTILIZATION)) + .subAggregation(max(CPU_UTILIZATION, NODE_UTILIZATION)) + .subAggregation(average(CPU_CAPACITY, NODE_CAPACITY)))); } @Override diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/FSRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/FSRequester.java index 025377ec94..a83ea3c387 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/FSRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/FSRequester.java @@ -69,8 +69,8 @@ public SearchRequest buildRequest(final Collection resourceIds, final Lo .field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW)) .subAggregation(ordered(AggregationBuilders.terms(AGGREGATION_DISK_NAME)) .field(path(FIELD_METRICS_TAGS, RESOURCE_ID)) - .subAggregation(average(AVG_AGGREGATION + LIMIT, LIMIT)) - .subAggregation(average(AVG_AGGREGATION + USAGE, USAGE))))); + .subAggregation(average(LIMIT, LIMIT)) + .subAggregation(average(USAGE, USAGE))))); } @Override @@ -112,8 +112,8 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate .aggregation(ordered(AggregationBuilders.terms(AGGREGATION_DISK_NAME)) .field(path(FIELD_METRICS_TAGS, RESOURCE_ID)) .subAggregation(dateHistogram(DISKS_HISTOGRAM, interval) - .subAggregation(average(AVG_AGGREGATION + USAGE, USAGE)) - .subAggregation(average(AVG_AGGREGATION + LIMIT, LIMIT))))); + .subAggregation(average(USAGE, USAGE)) + .subAggregation(average(LIMIT, LIMIT))))); } @Override diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUAggregationRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUAggregationRequester.java new file mode 100644 index 0000000000..5bb852c701 --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUAggregationRequester.java @@ -0,0 +1,158 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.dao.monitoring.metricrequester; + +import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringMetrics; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import org.apache.commons.collections4.CollectionUtils; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class GPUAggregationRequester extends AbstractGPUMetricsRequester { + private static final String AVG_UTILIZATION_GPU_FIELD = "avg_utilization_gpu"; + private static final String AVG_UTILIZATION_MEMORY_FIELD = "avg_utilization_memory"; + private static final String MIN_UTILIZATION_GPU_FIELD = "min_utilization_gpu"; + private static final String MIN_UTILIZATION_MEMORY_FIELD = "min_utilization_memory"; + private static final String MAX_UTILIZATION_GPU_FIELD = "max_utilization_gpu"; + private static final String MAX_UTILIZATION_MEMORY_FIELD = "max_utilization_memory"; + private static final String ACTIVE_GPUS_FIELD = "active_gpus"; + private static final String CHARTS_HISTOGRAM = "charts_histogram"; + + public GPUAggregationRequester(final HeapsterElasticRestHighLevelClient client) { + super(client); + } + + @Override + public ELKUsageMetric metric() { + return ELKUsageMetric.GPU_AGGS; + } + + @Override + public SearchRequest buildStatsRequest(final String nodeName, final LocalDateTime from, final LocalDateTime to, + final Duration interval) { + final SearchSourceBuilder aggregation = statsQuery(nodeName, NODE, from, to) + .size(1) + .aggregation(buildHistogram(interval)); + return request(from, to, aggregation); + } + + @Override + protected List parseStatsResponse(final SearchResponse response) { + final List aggregations = Optional.ofNullable(response.getAggregations()) + .map(Aggregations::asList) + .map(List::stream) + .orElseGet(Stream::empty) + .collect(Collectors.toList()); + if (CollectionUtils.isEmpty(aggregations)) { + return Collections.emptyList(); + } + final String deviceName = getDeviceName(response.getHits()); + return getBucketStream(aggregations) + .map(this::bucketToChart) + .peek(monitoringStats -> monitoringStats.setGpuDeviceName(deviceName)) + .collect(Collectors.toList()); + } + + @Override + public SearchRequest buildRequest(final Collection resourceIds, final LocalDateTime from, + final LocalDateTime to, final Map additional) { + return null; + } + + @Override + public Map parseResponse(SearchResponse response) { + return Collections.emptyMap(); + } + + private Stream getBucketStream(final List aggregations) { + return aggregations.stream() + .filter(agg -> CHARTS_HISTOGRAM.equals(agg.getName())) + .filter(agg -> agg instanceof MultiBucketsAggregation) + .map(MultiBucketsAggregation.class::cast) + .findFirst() + .map(MultiBucketsAggregation::getBuckets) + .map(List::stream) + .orElseGet(Stream::empty); + } + + private DateHistogramAggregationBuilder buildHistogram(final Duration interval) { + final DateHistogramAggregationBuilder histogramBuilder = dateHistogram(CHARTS_HISTOGRAM, interval); + + // gpu utilization aggregations + histogramBuilder + .subAggregation(average(AVG_AGGREGATION + GPU_UTILIZATION, AVG_UTILIZATION_GPU_FIELD)) + .subAggregation(min(MIN_AGGREGATION + GPU_UTILIZATION, MIN_UTILIZATION_GPU_FIELD)) + .subAggregation(max(MAX_AGGREGATION + GPU_UTILIZATION, MAX_UTILIZATION_GPU_FIELD)); + + // gpu memory utilization aggregations + histogramBuilder + .subAggregation(average(AVG_AGGREGATION + MEMORY_UTILIZATION, AVG_UTILIZATION_MEMORY_FIELD)) + .subAggregation(min(MIN_AGGREGATION + MEMORY_UTILIZATION, MIN_UTILIZATION_MEMORY_FIELD)) + .subAggregation(max(MAX_AGGREGATION + MEMORY_UTILIZATION, MAX_UTILIZATION_MEMORY_FIELD)); + + // active gpu aggregations + histogramBuilder + .subAggregation(average(ACTIVE_GPUS_FIELD, ACTIVE_GPUS_FIELD)) + .subAggregation(max(ACTIVE_GPUS_FIELD, ACTIVE_GPUS_FIELD)) + .subAggregation(min(ACTIVE_GPUS_FIELD, ACTIVE_GPUS_FIELD)); + return histogramBuilder; + } + + private MonitoringStats bucketToChart(final MultiBucketsAggregation.Bucket bucket) { + final MonitoringStats monitoringStats = new MonitoringStats(); + Optional.ofNullable(bucket.getKeyAsString()).ifPresent(monitoringStats::setStartTime); + monitoringStats.setGpuUsage(toGpuUsage(bucket)); + return monitoringStats; + } + + private MonitoringStats.GPUUsage toGpuUsage(final MultiBucketsAggregation.Bucket bucket) { + final List aggregations = aggregations(bucket); + return MonitoringStats.GPUUsage.builder() + .gpuUtilization(buildMetrics(aggregations, GPU_UTILIZATION)) + .gpuMemoryUtilization(buildMetrics(aggregations, MEMORY_UTILIZATION)) + .activeGpus(MonitoringMetrics.builder() + .average(getDoubleValue(aggregations, AVG_AGGREGATION + ACTIVE_GPUS_FIELD)) + .min(getDoubleValue(aggregations, MIN_AGGREGATION + ACTIVE_GPUS_FIELD)) + .max(getDoubleValue(aggregations, MAX_AGGREGATION + ACTIVE_GPUS_FIELD)) + .build()) + .build(); + } + + private MonitoringMetrics buildMetrics(final List aggregations, final String metricName) { + return MonitoringMetrics.builder() + .average(getDoubleValue(aggregations, AVG_AGGREGATION + AVG_AGGREGATION + metricName)) + .min(getDoubleValue(aggregations, MIN_AGGREGATION + MIN_AGGREGATION + metricName)) + .max(getDoubleValue(aggregations, MAX_AGGREGATION + MAX_AGGREGATION + metricName)) + .build(); + } +} diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUDetailsRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUDetailsRequester.java new file mode 100644 index 0000000000..1026066ac7 --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/GPUDetailsRequester.java @@ -0,0 +1,170 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.dao.monitoring.metricrequester; + +import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringMetrics; +import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import com.nimbusds.jose.util.Pair; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class GPUDetailsRequester extends AbstractGPUMetricsRequester { + private static final String DETAILS_HISTOGRAM = "details_histogram"; + private static final String UTILIZATION_GPU = "utilization_gpu"; + private static final String UTILIZATION_GPU_MEMORY = "utilization_memory"; + private static final String USED_GPU_MEMORY = "used_memory"; + private static final String DEVICE_ID_AGGREGATION = "device_id"; + private static final String DEVICE_ID_RAW_FIELD = "index.raw"; + + public GPUDetailsRequester(final HeapsterElasticRestHighLevelClient client) { + super(client); + } + + @Override + public ELKUsageMetric metric() { + return ELKUsageMetric.GPU; + } + + @Override + public SearchRequest buildStatsRequest(final String nodeName, final LocalDateTime from, final LocalDateTime to, + final Duration interval) { + final SearchSourceBuilder aggregation = statsQuery(nodeName, NODE, from, to) + .size(1) + .aggregation(ordered(AggregationBuilders.terms(DEVICE_ID_AGGREGATION) + .field(path(FIELD_METRICS_TAGS, DEVICE_ID_RAW_FIELD)) + .subAggregation(buildHistogram(interval)))); + return request(from, to, aggregation); + } + + @Override + protected List parseStatsResponse(final SearchResponse response) { + final String deviceName = getDeviceName(response.getHits()); + return Optional.ofNullable(response.getAggregations()) + .map(Aggregations::asList) + .map(List::stream) + .orElseGet(Stream::empty) + .filter(agg -> DEVICE_ID_AGGREGATION.equals(agg.getName())) + .filter(agg -> agg instanceof Terms) + .map(Terms.class::cast) + .findFirst() + .map(term -> Optional.of(term.getBuckets()) + .map(List::stream) + .orElseGet(Stream::empty) + .collect(Collectors.toMap( + MultiBucketsAggregation.Bucket::getKeyAsString, this::getDetailsBuckets)) + .entrySet().stream() + .flatMap(entry -> entry.getValue().stream().map(value -> Pair.of(entry.getKey(), value))) + .collect(Collectors.groupingBy(pair -> pair.getRight().getKeyAsString())) + .entrySet().stream() + .map(entry -> toChart(entry.getKey(), entry.getValue())) + .peek(chart -> chart.setGpuDeviceName(deviceName)) + .collect(Collectors.toList())) + .orElse(Collections.emptyList()); + } + + @Override + public SearchRequest buildRequest(final Collection resourceIds, final LocalDateTime from, + final LocalDateTime to, final Map additional) { + return null; + } + + @Override + public Map parseResponse(final SearchResponse response) { + return Collections.emptyMap(); + } + + private List getDetailsBuckets(final Terms.Bucket bucket) { + return Optional.ofNullable(bucket.getAggregations()) + .map(Aggregations::asList) + .map(List::stream) + .orElseGet(Stream::empty) + .filter(agg -> DETAILS_HISTOGRAM.equals(agg.getName())) + .findFirst() + .filter(agg -> agg instanceof MultiBucketsAggregation) + .map(MultiBucketsAggregation.class::cast) + .map(MultiBucketsAggregation::getBuckets) + .map(List::stream) + .orElseGet(Stream::empty) + .collect(Collectors.toList()); + } + + private MonitoringStats toChart(final String startDate, + final List> pairs) { + final MonitoringStats monitoringStats = new MonitoringStats(); + monitoringStats.setStartTime(startDate); + monitoringStats.setGpuDetails(pairs.stream().collect(Collectors.toMap(Pair::getLeft, this::toGpuUsage))); + return monitoringStats; + } + + private MonitoringStats.GPUUsage toGpuUsage(final Pair pair) { + final List aggregations = aggregations(pair.getRight()); + return MonitoringStats.GPUUsage.builder() + .gpuUtilization(buildMetrics(aggregations, UTILIZATION_GPU)) + .gpuMemoryUtilization(buildMetrics(aggregations, UTILIZATION_GPU_MEMORY)) + .gpuMemoryUsed(buildMetrics(aggregations, USED_GPU_MEMORY)) + .build(); + } + + private DateHistogramAggregationBuilder buildHistogram(final Duration interval) { + final DateHistogramAggregationBuilder histogramBuilder = dateHistogram(DETAILS_HISTOGRAM, interval); + + // gpu utilization aggregations + histogramBuilder + .subAggregation(average(UTILIZATION_GPU, UTILIZATION_GPU)) + .subAggregation(min(UTILIZATION_GPU, UTILIZATION_GPU)) + .subAggregation(max(UTILIZATION_GPU, UTILIZATION_GPU)); + + // gpu memory utilization aggregations + histogramBuilder + .subAggregation(average(UTILIZATION_GPU_MEMORY, UTILIZATION_GPU_MEMORY)) + .subAggregation(min(UTILIZATION_GPU_MEMORY, UTILIZATION_GPU_MEMORY)) + .subAggregation(max(UTILIZATION_GPU_MEMORY, UTILIZATION_GPU_MEMORY)); + + // gpu memory usage aggregations + histogramBuilder + .subAggregation(average(USED_GPU_MEMORY, USED_GPU_MEMORY)) + .subAggregation(max(USED_GPU_MEMORY, USED_GPU_MEMORY)) + .subAggregation(min(USED_GPU_MEMORY, USED_GPU_MEMORY)); + return histogramBuilder; + } + + private MonitoringMetrics buildMetrics(final List aggregations, final String metricName) { + return MonitoringMetrics.builder() + .average(getDoubleValue(aggregations, AVG_AGGREGATION + metricName)) + .min(getDoubleValue(aggregations, MIN_AGGREGATION + metricName)) + .max(getDoubleValue(aggregations, MAX_AGGREGATION + metricName)) + .build(); + } +} diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/MemoryRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/MemoryRequester.java index efcd86394f..03068aba06 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/MemoryRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/MemoryRequester.java @@ -64,8 +64,8 @@ public SearchRequest buildRequest(final Collection resourceIds, final Lo .aggregation(ordered(AggregationBuilders.terms(AGGREGATION_NODE_NAME)) .field(path(FIELD_METRICS_TAGS, FIELD_NODENAME_RAW)) .size(resourceIds.size()) - .subAggregation(average(AVG_AGGREGATION + MEMORY_CAPACITY, NODE_CAPACITY)) - .subAggregation(average(AVG_AGGREGATION + MEMORY_UTILIZATION, WORKING_SET)) + .subAggregation(average(MEMORY_CAPACITY, NODE_CAPACITY)) + .subAggregation(average(MEMORY_UTILIZATION, WORKING_SET)) .subAggregation(division(DIVISION_AGGREGATION + NODE_UTILIZATION, AVG_AGGREGATION + MEMORY_UTILIZATION, AVG_AGGREGATION + MEMORY_CAPACITY)))); @@ -83,9 +83,9 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate statsQuery(nodeName, NODE, from, to) .size(0) .aggregation(dateHistogram(MEMORY_HISTOGRAM, interval) - .subAggregation(average(AVG_AGGREGATION + MEMORY_UTILIZATION, WORKING_SET)) - .subAggregation(max(MAX_AGGREGATION + MEMORY_UTILIZATION, WORKING_SET)) - .subAggregation(average(AVG_AGGREGATION + MEMORY_CAPACITY, NODE_CAPACITY)))); + .subAggregation(average(MEMORY_UTILIZATION, WORKING_SET)) + .subAggregation(max(MEMORY_UTILIZATION, WORKING_SET)) + .subAggregation(average(MEMORY_CAPACITY, NODE_CAPACITY)))); } @Override diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java index cc20bcbe3b..35371c4a9a 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/NetworkRequester.java @@ -83,8 +83,8 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate statsQuery(nodeName, NODE, from, to) .size(0) .aggregation(dateHistogram(NETWORK_HISTOGRAM, interval) - .subAggregation(average(AVG_AGGREGATION + RX_RATE, RX_RATE)) - .subAggregation(average(AVG_AGGREGATION + TX_RATE, TX_RATE)))); + .subAggregation(average(RX_RATE, RX_RATE)) + .subAggregation(average(TX_RATE, TX_RATE)))); } @Override diff --git a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/PodFSRequester.java b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/PodFSRequester.java index 83de5bb970..c74f629d20 100644 --- a/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/PodFSRequester.java +++ b/api/src/main/java/com/epam/pipeline/dao/monitoring/metricrequester/PodFSRequester.java @@ -77,8 +77,8 @@ protected SearchRequest buildStatsRequest(final String nodeName, final LocalDate .aggregation(ordered(AggregationBuilders.terms(AGGREGATION_DISK_NAME)) .field(path(FIELD_METRICS_TAGS, RESOURCE_ID)) .subAggregation(dateHistogram(DISKS_HISTOGRAM, interval) - .subAggregation(average(AVG_AGGREGATION + USAGE, USAGE)) - .subAggregation(average(AVG_AGGREGATION + LIMIT, LIMIT))))); + .subAggregation(average(USAGE, USAGE)) + .subAggregation(average(LIMIT, LIMIT))))); } @Override diff --git a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/ELKUsageMetric.java b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/ELKUsageMetric.java index bde34e0d35..19e1af731c 100644 --- a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/ELKUsageMetric.java +++ b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/ELKUsageMetric.java @@ -27,7 +27,9 @@ public enum ELKUsageMetric { MEM("memory", "MemoryMetricsTimestamp"), FS("filesystem", "FilesystemMetricsTimestamp"), POD_FS("filesystem", "FilesystemMetricsTimestamp"), - NETWORK("network", "NetworkMetricsTimestamp"); + NETWORK("network", "NetworkMetricsTimestamp"), + GPU("gpu", "GpuMetricsTimestamp"), + GPU_AGGS("gpu_aggs", "GpuAggsMetricsTimestamp"); private final String name; private final String timestamp; diff --git a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringMetrics.java b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringMetrics.java new file mode 100644 index 0000000000..21abc1d30d --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringMetrics.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.entity.cluster.monitoring; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class MonitoringMetrics { + private Double average; + private Double min; + private Double max; +} diff --git a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringStats.java b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringStats.java index 54478eab8e..efac5461b7 100644 --- a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringStats.java +++ b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/MonitoringStats.java @@ -19,6 +19,8 @@ import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; +import lombok.Data; +import lombok.Builder; import java.util.LinkedHashMap; import java.util.Map; @@ -39,6 +41,9 @@ public class MonitoringStats { private MemoryUsage memoryUsage; private DisksUsage disksUsage; private NetworkUsage networkUsage; + private GPUUsage gpuUsage; + private Map gpuDetails; + private String gpuDeviceName; @Setter @Getter @@ -90,4 +95,13 @@ public static class ContainerSpec { private long maxMemory; private int numberOfCores; } + + @Data + @Builder + public static class GPUUsage { + private MonitoringMetrics gpuUtilization; + private MonitoringMetrics gpuMemoryUtilization; + private MonitoringMetrics gpuMemoryUsed; + private MonitoringMetrics activeGpus; + } } diff --git a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMetricsGranularity.java b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMetricsGranularity.java new file mode 100644 index 0000000000..243d55a517 --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMetricsGranularity.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.entity.cluster.monitoring.gpu; + +import java.util.List; + +/** + * Represents a levels of details for GPU usage metrics. + */ +public enum GpuMetricsGranularity { + /** + * Loads charts of usages by each GPU ID + */ + DETAILS, + /** + * Loads charts of aggregated usages across all GPU IDs + */ + AGGREGATIONS, + /** + * Loads aggregated usages for requested period of time + */ + GLOBAL, + /** + * Loads all levels described above + */ + ALL; + + public static boolean hasDetails(final List types) { + return types.contains(GpuMetricsGranularity.ALL) || types.contains(GpuMetricsGranularity.DETAILS); + } + + public static boolean hasAggregations(final List types) { + return types.contains(GpuMetricsGranularity.ALL) || types.contains(GpuMetricsGranularity.AGGREGATIONS); + } + + public static boolean hasGlobal(final List types) { + return types.contains(GpuMetricsGranularity.ALL) || types.contains(GpuMetricsGranularity.GLOBAL); + } +} diff --git a/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMonitoringStats.java b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMonitoringStats.java new file mode 100644 index 0000000000..959caa300b --- /dev/null +++ b/api/src/main/java/com/epam/pipeline/entity/cluster/monitoring/gpu/GpuMonitoringStats.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.entity.cluster.monitoring.gpu; + +import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import lombok.Builder; +import lombok.Data; + +import java.util.List; + +@Data +@Builder +public class GpuMonitoringStats { + private MonitoringStats global; + private List charts; +} diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/CAdvisorMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/CAdvisorMonitoringManager.java index cb1ffef243..f685c07544 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/CAdvisorMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/CAdvisorMonitoringManager.java @@ -22,6 +22,8 @@ import com.epam.pipeline.entity.cluster.NodeInstanceAddress; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; import com.epam.pipeline.entity.cluster.monitoring.RawMonitoringStats; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats; import com.epam.pipeline.manager.cluster.KubernetesManager; import com.epam.pipeline.manager.cluster.MonitoringReportType; import com.epam.pipeline.manager.cluster.NodesManager; @@ -114,6 +116,14 @@ public List getStatsForNode(final String nodeName, final LocalD return getStats(nodeName, start, end); } + @Override + public GpuMonitoringStats getGpuStatsForNode(final String nodeName, final LocalDateTime from, + final LocalDateTime to, + final List granularity, + final boolean squashCharts) { + throw new UnsupportedOperationException("GPU statistic is not available for CAdvisor metrics"); + } + @Override public InputStream getStatsForNodeAsInputStream(final String nodeName, final LocalDateTime from, diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java index 43d53941f0..30f763967b 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/ESMonitoringManager.java @@ -20,10 +20,14 @@ import com.epam.pipeline.common.MessageHelper; import com.epam.pipeline.dao.monitoring.MonitoringESDao; import com.epam.pipeline.dao.monitoring.metricrequester.AbstractMetricRequester; +import com.epam.pipeline.dao.monitoring.metricrequester.GPUAggregationRequester; +import com.epam.pipeline.dao.monitoring.metricrequester.GPUDetailsRequester; import com.epam.pipeline.dao.monitoring.metricrequester.HeapsterElasticRestHighLevelClient; import com.epam.pipeline.entity.cluster.NodeInstance; import com.epam.pipeline.entity.cluster.monitoring.ELKUsageMetric; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats; import com.epam.pipeline.entity.utils.DateUtils; import com.epam.pipeline.manager.cluster.KubernetesConstants; import com.epam.pipeline.manager.cluster.MonitoringReportType; @@ -32,7 +36,9 @@ import com.epam.pipeline.manager.preference.PreferenceManager; import com.epam.pipeline.manager.preference.SystemPreferences; import com.epam.pipeline.utils.CommonUtils; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.elasticsearch.ElasticsearchStatusException; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; import org.springframework.util.Assert; @@ -40,15 +46,18 @@ import java.io.InputStream; import java.time.Duration; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; @Service +@Slf4j @ConditionalOnProperty(name = "monitoring.backend", havingValue = "elastic") public class ESMonitoringManager implements UsageMonitoringManager { @@ -59,6 +68,8 @@ public class ESMonitoringManager implements UsageMonitoringManager { private static final int FALLBACK_INTERVALS_NUMBER = 10; private static final int TWO = 2; private static final String SWAP_FILESYSTEM = "tmpfs"; + private static final String HEAPSTER_INDEX_NAME_TOKEN = "heapster-"; + private static final String GPU_STAT_INDEX_NAME_TOKEN = "cp-gpu-monitor-"; private final HeapsterElasticRestHighLevelClient client; private final MonitoringESDao monitoringDao; @@ -85,7 +96,7 @@ public ESMonitoringManager(final HeapsterElasticRestHighLevelClient client, public List getStatsForNode(final String nodeName, final LocalDateTime from, final LocalDateTime to) { final LocalDateTime requestedStart = Optional.ofNullable(from).orElseGet(() -> creationDate(nodeName)); - final LocalDateTime oldestMonitoring = oldestMonitoringDate(); + final LocalDateTime oldestMonitoring = oldestMonitoringDate(HEAPSTER_INDEX_NAME_TOKEN); final LocalDateTime start = requestedStart.isAfter(oldestMonitoring) ? requestedStart : oldestMonitoring; final LocalDateTime end = Optional.ofNullable(to).orElseGet(DateUtils::nowUTC); final Duration interval = interval(start, end); @@ -94,6 +105,22 @@ public List getStatsForNode(final String nodeName, final LocalD : Collections.emptyList(); } + @Override + public GpuMonitoringStats getGpuStatsForNode(final String nodeName, final LocalDateTime from, + final LocalDateTime to, + final List granularity, + final boolean squashCharts) { + final LocalDateTime requestedStart = Optional.ofNullable(from).orElseGet(() -> creationDate(nodeName)); + final LocalDateTime oldestMonitoring = oldestMonitoringDate(GPU_STAT_INDEX_NAME_TOKEN); + final LocalDateTime start = requestedStart.isAfter(oldestMonitoring) ? requestedStart : oldestMonitoring; + final LocalDateTime end = Optional.ofNullable(to).orElseGet(DateUtils::nowUTC); + final Duration totalDuration = Duration.between(start, end); + final Duration interval = squashCharts ? totalDuration : interval(start, end); + return end.isAfter(start) && end.isAfter(oldestMonitoring) + ? getGpuStats(nodeName, start, end, interval, totalDuration, granularity) + : GpuMonitoringStats.builder().build(); + } + @Override public InputStream getStatsForNodeAsInputStream(final String nodeName, final LocalDateTime from, @@ -101,7 +128,7 @@ public InputStream getStatsForNodeAsInputStream(final String nodeName, final Duration interval, final MonitoringReportType type) { final LocalDateTime requestedStart = Optional.ofNullable(from).orElseGet(() -> creationDate(nodeName)); - final LocalDateTime oldestMonitoring = oldestMonitoringDate(); + final LocalDateTime oldestMonitoring = oldestMonitoringDate(HEAPSTER_INDEX_NAME_TOKEN); final LocalDateTime start = requestedStart.isAfter(oldestMonitoring) ? requestedStart : oldestMonitoring; final LocalDateTime end = Optional.ofNullable(to).orElseGet(DateUtils::nowUTC); final Duration minDuration = minimalDuration(); @@ -144,8 +171,8 @@ public long getDiskSpaceAvailable(final String nodeName, final String podId, fin return diskStats.getCapacity() - diskStats.getUsableSpace(); } - private LocalDateTime oldestMonitoringDate() { - return monitoringDao.oldestIndexDate().orElseGet(this::fallbackMonitoringStart); + private LocalDateTime oldestMonitoringDate(final String... indexPrefixes) { + return monitoringDao.oldestIndexDate(indexPrefixes).orElseGet(this::fallbackMonitoringStart); } private LocalDateTime creationDate(final String nodeName) { @@ -159,6 +186,29 @@ private LocalDateTime fallbackMonitoringStart() { return DateUtils.nowUTC().minus(FALLBACK_MONITORING_PERIOD); } + private GpuMonitoringStats getGpuStats(final String nodeName, final LocalDateTime start, final LocalDateTime end, + final Duration interval, final Duration totalDuration, + final List granularity) { + try { + final GPUAggregationRequester aggregationRequester = new GPUAggregationRequester(client); + final GpuMonitoringStats.GpuMonitoringStatsBuilder results = GpuMonitoringStats.builder(); + if (GpuMetricsGranularity.hasGlobal(granularity)) { + results.global(aggregationRequester.requestStats(nodeName, start, end, totalDuration).stream() + .findFirst() + .flatMap(stats -> statsWithinRegion(stats, start, end, totalDuration)) + .orElse(null)); + } + return results.charts(getGpuCharts(granularity, aggregationRequester, interval, nodeName, start, end)) + .build(); + } catch (ElasticsearchStatusException e) { + if (e.getDetailedMessage().contains("index_not_found_exception")) { + log.error("GPU monitor index doesn't exist for node '{}'", nodeName, e); + return GpuMonitoringStats.builder().build(); + } + throw e; + } + } + private List getStats(final String nodeName, final LocalDateTime start, final LocalDateTime end, final Duration interval) { return Stream.of(MONITORING_METRICS) @@ -201,6 +251,7 @@ private int numberOfIntervals() { private MonitoringStats mergeStats(final MonitoringStats first, final MonitoringStats second) { final Optional original = Optional.of(first); first.setCpuUsage(original.map(MonitoringStats::getCpuUsage).orElseGet(second::getCpuUsage)); + first.setGpuUsage(original.map(MonitoringStats::getGpuUsage).orElseGet(second::getGpuUsage)); first.setMemoryUsage(original.map(MonitoringStats::getMemoryUsage).orElseGet(second::getMemoryUsage)); first.setNetworkUsage(original.map(MonitoringStats::getNetworkUsage).orElseGet(second::getNetworkUsage)); if (first.getDisksUsage() != null && second.getDisksUsage() != null) { @@ -228,6 +279,13 @@ private MonitoringStats mergeStats(final MonitoringStats first, final Monitoring return first; } + private MonitoringStats mergeGpuStats(final MonitoringStats first, final MonitoringStats second) { + final Optional original = Optional.of(first); + first.setGpuUsage(original.map(MonitoringStats::getGpuUsage).orElseGet(second::getGpuUsage)); + first.setGpuDetails(original.map(MonitoringStats::getGpuDetails).orElseGet(second::getGpuDetails)); + return first; + } + private boolean isMonitoringStatsComplete(final MonitoringStats monitoringStats) { return monitoringStats.getCpuUsage() != null && monitoringStats.getMemoryUsage() != null @@ -262,4 +320,49 @@ private MonitoringStats.DisksUsage.DiskStats merged(final MonitoringStats.DisksU s3.setUsableSpace(s1.getUsableSpace() + s2.getUsableSpace()); return s3; } + + private List getGpuCharts(final List loadTypes, + final GPUAggregationRequester aggregationRequester, + final Duration interval, final String nodeName, + final LocalDateTime start, final LocalDateTime end) { + if (!GpuMetricsGranularity.hasAggregations(loadTypes) && !GpuMetricsGranularity.hasDetails(loadTypes)) { + return null; + } + final List charts = new ArrayList<>(); + if (GpuMetricsGranularity.hasAggregations(loadTypes)) { + charts.addAll(requestCharts(aggregationRequester, interval, nodeName, start, end)); + } + if (GpuMetricsGranularity.hasDetails(loadTypes)) { + final GPUDetailsRequester detailsRequester = new GPUDetailsRequester(client); + charts.addAll(requestCharts(detailsRequester, interval, nodeName, start, end)); + } + if (GpuMetricsGranularity.hasDetails(loadTypes) && GpuMetricsGranularity.hasAggregations(loadTypes)) { + return charts.stream() + .collect(Collectors.groupingBy(MonitoringStats::getStartTime, + Collectors.reducing(this::mergeGpuStats))) + .values().stream() + .filter(Optional::isPresent) + .map(Optional::get) + .filter(stats -> stats.getGpuUsage() != null && stats.getGpuDetails() != null) + .map(stats -> statsWithinRegion(stats, start, end, interval)) + .filter(Optional::isPresent) + .map(Optional::get) + .sorted(Comparator.comparing(MonitoringStats::getStartTime, + Comparator.comparing(this::asMonitoringDateTime))) + .collect(Collectors.toList()); + } + return charts.stream() + .sorted(Comparator.comparing(MonitoringStats::getStartTime, + Comparator.comparing(this::asMonitoringDateTime))) + .collect(Collectors.toList()); + } + + private List requestCharts(final AbstractMetricRequester requester, final Duration interval, + final String nodeName, final LocalDateTime start, + final LocalDateTime end) { + return requester.requestStats(nodeName, start, end, interval).stream() + .map(stats -> statsWithinRegion(stats, start, end, interval).orElse(null)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } } diff --git a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/UsageMonitoringManager.java b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/UsageMonitoringManager.java index 870c2f4bee..10482d9fca 100644 --- a/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/UsageMonitoringManager.java +++ b/api/src/main/java/com/epam/pipeline/manager/cluster/performancemonitoring/UsageMonitoringManager.java @@ -17,6 +17,8 @@ package com.epam.pipeline.manager.cluster.performancemonitoring; import com.epam.pipeline.entity.cluster.monitoring.MonitoringStats; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMetricsGranularity; +import com.epam.pipeline.entity.cluster.monitoring.gpu.GpuMonitoringStats; import com.epam.pipeline.manager.cluster.MonitoringReportType; import javax.annotation.Nullable; @@ -52,6 +54,22 @@ List getStatsForNode(String nodeName, @Nullable LocalDateTime from, @Nullable LocalDateTime to); + /** + * Retrieves GPU monitoring stats for node. + * + * @param nodeName Cluster node name. + * @param from Minimal date for collecting stats. + * @param to Maximal date for collecting stats. + * @param granularity the list of granularity levels to load GPU usages + * @param squashCharts if specified charts shall be squashed into one + * @return GPU usage statistics. + */ + GpuMonitoringStats getGpuStatsForNode(String nodeName, + @Nullable LocalDateTime from, + @Nullable LocalDateTime to, + List granularity, + boolean squashCharts); + /** * Retrieves monitoring stats for node as input stream. * diff --git a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java index e7fccff8be..c8ed85cedf 100644 --- a/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java +++ b/api/src/main/java/com/epam/pipeline/manager/preference/SystemPreferences.java @@ -1419,6 +1419,13 @@ public class SystemPreferences { public static final IntPreference MONITORING_POOL_USAGE_STORE_DAYS = new IntPreference( "monitoring.node.pool.usage.store.days", 365, MONITORING_GROUP, pass); + public static final IntPreference MONITORING_GPU_USAGE_DELAY = new IntPreference( + "monitoring.gpu.usage.delay", 60000, MONITORING_GROUP, isGreaterThan(0)); + + public static final BooleanPreference MONITORING_GPU_USAGE_ENABLE = new BooleanPreference( + "monitoring.gpu.usage.enable", false, MONITORING_GROUP, pass); + + // Cloud public static final ObjectPreference> CLOUD_ACCESS_MANAGEMENT_CONFIG = new ObjectPreference<>( diff --git a/build.gradle b/build.gradle index 13e5b22cfb..a9b36304a5 100644 --- a/build.gradle +++ b/build.gradle @@ -147,6 +147,11 @@ task installDist() { into "$distFolder/search-mappings" } + copy { + from "$rootDir/monitoring-service/src/main/resources/templates" + into "$distFolder/monitoring-index-mappings" + } + copy { from "$rootDir/vm-monitor/src/main/resources/templates" into "$distFolder/vm-monitor-templates" diff --git a/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/pipeline/CloudPipelineAPI.java b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/pipeline/CloudPipelineAPI.java index ac08bc72c3..6dc86cc38f 100644 --- a/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/pipeline/CloudPipelineAPI.java +++ b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/pipeline/CloudPipelineAPI.java @@ -293,6 +293,9 @@ Call>> loadRepositoryContent(@Path(ID) Long id, @GET("cluster/instance/loadAll") Call>> loadAllInstanceTypesForRegion(@Query(REGION_ID) Long regionId); + @GET("cluster/instance/loadAll") + Call>> loadAllInstanceTypes(); + @GET("cluster/node/{id}/disks") Call>> loadNodeDisks(@Path(ID) String nodeId); diff --git a/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/reporter/NodeReporterClient.java b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/reporter/NodeReporterClient.java index 889c2d03a8..13cbee1d3d 100644 --- a/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/reporter/NodeReporterClient.java +++ b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/client/reporter/NodeReporterClient.java @@ -1,10 +1,31 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.epam.pipeline.client.reporter; +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; import com.epam.pipeline.entity.reporter.NodeReporterHostStats; import retrofit2.Call; import retrofit2.http.GET; +import java.util.List; public interface NodeReporterClient { @GET("/") Call load(); + + @GET("/gpus") + Call> loadGpuStats(); } diff --git a/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/entity/reporter/NodeReporterGpuUsages.java b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/entity/reporter/NodeReporterGpuUsages.java new file mode 100644 index 0000000000..4f56e5f7a0 --- /dev/null +++ b/cloud-pipeline-common/model/src/main/java/com/epam/pipeline/entity/reporter/NodeReporterGpuUsages.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.entity.reporter; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class NodeReporterGpuUsages { + String name; + String index; + Integer utilizationGpu; + Integer memoryUsed; + Integer memoryTotal; + Integer memoryUtilization; +} diff --git a/deploy/docker/cp-monitoring-srv/Dockerfile b/deploy/docker/cp-monitoring-srv/Dockerfile index d0bf9a3dbd..08c9c648d1 100644 --- a/deploy/docker/cp-monitoring-srv/Dockerfile +++ b/deploy/docker/cp-monitoring-srv/Dockerfile @@ -29,12 +29,14 @@ RUN yum install -y \ # API distribution ARG CP_API_DIST_URL="" ENV CP_MONITORING_HOME="/opt/monitoring" +ENV CP_MONITOR_SRV_INDEX_MAPPINGS_LOCATION="$CP_MONITORING_HOME/monitoring-index-mappings" RUN cd /tmp && \ wget -q "$CP_API_DIST_URL" -O cloud-pipeline.tgz && \ tar -zxf cloud-pipeline.tgz && \ mkdir -p $CP_MONITORING_HOME && \ mv bin/monitoring-service.jar $CP_MONITORING_HOME/ && \ + mv bin/monitoring-index-mappings $CP_MONITOR_SRV_INDEX_MAPPINGS_LOCATION && \ rm -rf /tmp/* ADD config $CP_MONITORING_HOME/config diff --git a/deploy/docker/cp-monitoring-srv/config/application.properties b/deploy/docker/cp-monitoring-srv/config/application.properties index 81054f7f7a..e3e8ce3b8d 100644 --- a/deploy/docker/cp-monitoring-srv/config/application.properties +++ b/deploy/docker/cp-monitoring-srv/config/application.properties @@ -18,3 +18,21 @@ preference.name.usage.node.pool.enable=monitoring.node.pool.usage.enable preference.name.usage.node.pool.clean.enable=monitoring.node.pool.usage.clean.enable preference.name.usage.node.pool.clean.delay=monitoring.node.pool.usage.clean.delay preference.name.usage.node.pool.store.period=monitoring.node.pool.usage.store.days +preference.name.usage.node.gpu.delay=monitoring.gpu.usage.delay +preference.name.usage.node.gpu.enable=monitoring.gpu.usage.enable + +# Elasticsearch +es.port=${CP_HEAPSTER_ELK_INTERNAL_PORT:30094} +es.host=${CP_HEAPSTER_ELK_INTERNAL_HOST:cp-heapster-elk.default.svc.cluster.local} +es.gpu.monitor.index.prefix=${CP_MONITOR_SRV_GPU_USAGE_INDEX_PREFIX:cp-gpu-monitor-} +es.gpu.monitor.index.mappings=file://${CP_MONITOR_SRV_INDEX_MAPPINGS_LOCATION}/cp_monitor_gpu_index.json +es.index.bulk.size=1000 + +# Node Reporter Service +node.reporter.srv.pod.name=${CP_VM_MONITOR_NODE_REPORTING_POD_NAME:cp-node-reporter} +node.reporter.srv.namespace=${CP_VM_MONITOR_NODE_NAMESPACE:default} +node.reporter.srv.schema=${CP_VM_MONITOR_NODE_STATS_REQUEST_SCHEMA:http} +node.reporter.srv.port=${CP_VM_MONITOR_NODE_STATS_REQUEST_PORT:8000} + +# Monitor Gpu Usages +monitoring.gpu.usage.pool.size=${CP_MONITOR_SRV_GPU_USAGE_POOL_SIZE:1} diff --git a/deploy/docker/cp-node-reporter/stats/reporter.py b/deploy/docker/cp-node-reporter/stats/reporter.py index ec8f83187a..ed39f7ccfc 100644 --- a/deploy/docker/cp-node-reporter/stats/reporter.py +++ b/deploy/docker/cp-node-reporter/stats/reporter.py @@ -23,7 +23,7 @@ from collections import namedtuple from enum import Enum, auto from logging.handlers import TimedRotatingFileHandler - +import subprocess import psutil from flask import Flask @@ -144,6 +144,33 @@ def view(self, stats): return host_view +class GPUStatProcessor: + + def __init__(self, metrics): + self.metrics = metrics + self.header = metrics.split(',') + + def get_stat(self): + try: + process = subprocess.Popen(['nvidia-smi', + f'--query-gpu={self.metrics}', + '--format=csv,noheader,nounits'], + stderr=subprocess.PIPE, stdout=subprocess.PIPE) + stdout = process.stdout + stderr = process.stderr + exit_code = process.wait() + if exit_code != 0: + raise RuntimeError(f"Process finished with exit code '{exit_code}': {stderr}") + for stdout_line in stdout.readlines(): + result_metrics = str(stdout_line.decode().strip()).split(', ') + yield { + self.header[i].replace('.', '_'): str(result_metrics[i]).strip() for i in range(len(self.header)) + } + except FileNotFoundError: + # nvidia-smi not installed + yield {} + + logging_format = os.getenv('CP_LOGGING_FORMAT', default='%(asctime)s [%(threadName)s] [%(levelname)s] %(message)s') logging_level = os.getenv('CP_LOGGING_LEVEL', default='DEBUG') logging_file = os.getenv('CP_LOGGING_FILE', default='stats.log') @@ -172,6 +199,11 @@ def view(self, stats): ProcOpenFilesResolver(include=procs_include)]) viewer = JsonStatsViewer(host=host) +gpu_metrics = os.getenv( + 'CP_NODE_REPORTER_GPU_STATS_METRIX', + 'name,index,utilization.gpu,memory.total,memory.used') +gpu_processor = GPUStatProcessor(gpu_metrics) + logging.info('Initializing...') app = Flask(__name__) @@ -181,3 +213,8 @@ def get_stats(): stats = collector.collect() view = viewer.view(stats) return json.dumps(view, indent=4) + + +@app.route('/gpus') +def get_gpus(): + return json.dumps([o for o in gpu_processor.get_stat()], indent=1) diff --git a/monitoring-service/build.gradle b/monitoring-service/build.gradle index 2d7dd73228..c0abb5704e 100644 --- a/monitoring-service/build.gradle +++ b/monitoring-service/build.gradle @@ -69,7 +69,15 @@ dependencies { implementation group: "com.fasterxml.jackson.core", name: "jackson-databind", version: jaksonVersion // RxJava - compile group: "io.reactivex.rxjava2", name: "rxjava", version: rxjavaVersion + implementation group: "io.reactivex.rxjava2", name: "rxjava", version: rxjavaVersion + + //K8s + implementation group: "io.fabric8", name: "kubernetes-client", version: k8sLibraryVersion + + // Elasticsearch + implementation group: "org.elasticsearch", name: "elasticsearch", version: elasticsearchVersion + implementation group: "org.elasticsearch.client", name: "elasticsearch-rest-high-level-client", version: elasticsearchVersion + testImplementation group: "org.elasticsearch.client", name: "transport", version: elasticsearchVersion //Tests testImplementation group: "org.springframework.boot", name: "spring-boot-starter-test", version: springBootVersion diff --git a/monitoring-service/gradle.properties b/monitoring-service/gradle.properties index 6b6386c242..4ebe8ae6c7 100644 --- a/monitoring-service/gradle.properties +++ b/monitoring-service/gradle.properties @@ -23,3 +23,5 @@ junitVersion=5.2.0 mockitoVersion=2.21.0 hamcrestVersion=1.3 rxjavaVersion=2.1.12 +k8sLibraryVersion=4.6.1 +elasticsearchVersion=6.8.3 diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/ElasticsearchConfiguration.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/ElasticsearchConfiguration.java new file mode 100644 index 0000000000..0b867b6613 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/ElasticsearchConfiguration.java @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.config; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ElasticsearchConfiguration { + + @Value("${es.host}") + private String elasticsearchUrl; + @Value("${es.port}") + private int elasticsearchPort; + + @Bean + public RestHighLevelClient elasticsearchClient(final RestClientBuilder lowLevelClientBuilder) { + return new RestHighLevelClient(lowLevelClientBuilder); + } + + @Bean + public RestClient lowLevelClient(final RestClientBuilder lowLevelClientBuilder) { + return lowLevelClientBuilder.build(); + } + + @Bean + public RestClientBuilder lowLevelClientBuilder() { + return RestClient.builder(new HttpHost(elasticsearchUrl, elasticsearchPort, "http")); + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/MonitoringConfiguration.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/MonitoringConfiguration.java index 6b0183c4e0..8ad115dbf7 100644 --- a/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/MonitoringConfiguration.java +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/config/MonitoringConfiguration.java @@ -17,6 +17,7 @@ package com.epam.pipeline.monitor.config; import com.epam.pipeline.monitor.monitoring.SchedulingService; +import com.epam.pipeline.monitor.monitoring.node.GpuUsageMonitoringService; import com.epam.pipeline.monitor.monitoring.pool.NodePoolMonitoringService; import com.epam.pipeline.monitor.monitoring.pool.NodePoolUsageCleanerService; import com.epam.pipeline.monitor.monitoring.user.OnlineUsersCleanerService; @@ -75,4 +76,15 @@ public SchedulingService onlineUsersCleaner(final TaskScheduler scheduler, return new SchedulingService(scheduler, monitoringService, client, monitorDelayPreferenceName, preferencesService, "UsageUserCleaner"); } + + @Bean + public SchedulingService gpuUsageMonitor(final TaskScheduler scheduler, + final GpuUsageMonitoringService monitoringService, + final CloudPipelineAPIClient client, + @Value("${preference.name.usage.node.gpu.delay}") + final String monitorDelayPreferenceName, + final PreferencesService preferencesService) { + return new SchedulingService(scheduler, monitoringService, client, monitorDelayPreferenceName, + preferencesService, "NodeGpuUsageMonitor"); + } } diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageStats.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageStats.java new file mode 100644 index 0000000000..67756375bf --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageStats.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.model.node; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class GpuUsageStats { + private String deviceName; + private GpuUsageSummary average; + private GpuUsageSummary min; + private GpuUsageSummary max; + private Integer activeGpusUtilization; +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageSummary.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageSummary.java new file mode 100644 index 0000000000..bd739d21f5 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsageSummary.java @@ -0,0 +1,28 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.model.node; + +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class GpuUsageSummary { + Integer gpuUtilization; + Integer memoryUtilization; + Integer memoryUsage; +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsages.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsages.java new file mode 100644 index 0000000000..5c0f9bad6d --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/model/node/GpuUsages.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.model.node; + +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; +import lombok.Builder; +import lombok.Data; + +import java.time.LocalDateTime; +import java.util.List; + +@Data +@Builder +public class GpuUsages { + private List usages; + private String nodename; + private LocalDateTime timestamp; + private GpuUsageStats stats; +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringService.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringService.java new file mode 100644 index 0000000000..ff67fe3eac --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringService.java @@ -0,0 +1,152 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.monitoring.node; + +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; +import com.epam.pipeline.monitor.model.node.GpuUsageSummary; +import com.epam.pipeline.monitor.model.node.GpuUsageStats; +import com.epam.pipeline.monitor.model.node.GpuUsages; +import com.epam.pipeline.monitor.monitoring.MonitoringService; +import com.epam.pipeline.monitor.rest.CloudPipelineAPIClient; +import com.epam.pipeline.monitor.service.InstanceTypesLoader; +import com.epam.pipeline.monitor.service.elasticsearch.MonitoringElasticsearchService; +import com.epam.pipeline.monitor.service.reporter.NodeReporterService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.IntSummaryStatistics; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class GpuUsageMonitoringService implements MonitoringService { + private static final int TO_PERCENTAGE = 100; + + private final String monitorEnabledPreferenceName; + private final CloudPipelineAPIClient cloudPipelineClient; + private final NodeReporterService nodeReporterService; + private final MonitoringElasticsearchService monitoringElasticsearchService; + private final InstanceTypesLoader instanceTypesLoader; + + public GpuUsageMonitoringService(@Value("${preference.name.usage.node.gpu.enable}") + final String monitorEnabledPreferenceName, + final CloudPipelineAPIClient cloudPipelineClient, + final NodeReporterService nodeReporterService, + final MonitoringElasticsearchService monitoringElasticsearchService, + final InstanceTypesLoader instanceTypesLoader) { + this.monitorEnabledPreferenceName = monitorEnabledPreferenceName; + this.cloudPipelineClient = cloudPipelineClient; + this.nodeReporterService = nodeReporterService; + this.monitoringElasticsearchService = monitoringElasticsearchService; + this.instanceTypesLoader = instanceTypesLoader; + } + + @Override + public void monitor() { + if (!cloudPipelineClient.getBooleanPreference(monitorEnabledPreferenceName)) { + log.debug("Gpu usage monitor is not enabled"); + return; + } + log.info("Collecting gpu usages..."); + final Set gpuInstanceTypes = instanceTypesLoader.loadGpuInstanceTypes(); + final List usages = nodeReporterService.collectGpuUsages(gpuInstanceTypes); + + if (CollectionUtils.isEmpty(usages)) { + log.info("Gpu usages not found. Finishing gpu usages monitoring."); + return; + } + final List usagesWithSummaries = usages.stream() + .map(this::tryFillSummaries) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + monitoringElasticsearchService.saveGpuUsages(usagesWithSummaries); + log.info("Finishing gpu usages monitoring."); + } + + @SuppressWarnings("PMD.AvoidCatchingGenericException") + private GpuUsages tryFillSummaries(final GpuUsages usage) { + try { + if (CollectionUtils.isEmpty(usage.getUsages())) { + return null; + } + usage.setUsages(usage.getUsages().stream() + .peek(value -> value.setMemoryUtilization( + calculateUtilization(value.getMemoryUsed(), value.getMemoryTotal()))) + .collect(Collectors.toList())); + usage.setStats(collectSummaries(usage.getUsages())); + return usage; + } catch (Exception e) { + log.error("Failed to collect gpu statistics.", e); + return null; + } + } + + private GpuUsageStats collectSummaries(final List usages) { + final IntSummaryStatistics gpuUtilizationSummary = usages.stream() + .mapToInt(NodeReporterGpuUsages::getUtilizationGpu) + .summaryStatistics(); + final IntSummaryStatistics memoryUtilizationSummary = usages.stream() + .mapToInt(NodeReporterGpuUsages::getMemoryUtilization) + .summaryStatistics(); + final IntSummaryStatistics memoryUsageSummary = usages.stream() + .mapToInt(NodeReporterGpuUsages::getMemoryUsed) + .summaryStatistics(); + + final GpuUsageSummary averagesSummary = GpuUsageSummary.builder() + .gpuUtilization((int) Math.round(gpuUtilizationSummary.getAverage())) + .memoryUtilization((int) Math.round(memoryUtilizationSummary.getAverage())) + .memoryUsage((int) Math.round(memoryUsageSummary.getAverage())) + .build(); + final GpuUsageSummary minSummary = GpuUsageSummary.builder() + .gpuUtilization(gpuUtilizationSummary.getMin()) + .memoryUtilization(memoryUtilizationSummary.getMin()) + .memoryUsage(memoryUsageSummary.getMin()) + .build(); + final GpuUsageSummary maxSummary = GpuUsageSummary.builder() + .gpuUtilization(gpuUtilizationSummary.getMax()) + .memoryUtilization(memoryUtilizationSummary.getMax()) + .memoryUsage(memoryUsageSummary.getMax()) + .build(); + + final long activeDevices = usages.stream() + .map(NodeReporterGpuUsages::getUtilizationGpu) + .filter(value -> value > 0) + .count(); + + final String deviceName = usages.stream() + .findFirst() + .map(NodeReporterGpuUsages::getName) + .orElse(null); + + return GpuUsageStats.builder() + .deviceName(deviceName) + .average(averagesSummary) + .min(minSummary) + .max(maxSummary) + .activeGpusUtilization(calculateUtilization((int) activeDevices, usages.size())) + .build(); + } + + private Integer calculateUtilization(final int value, final int totalValue) { + return (int) (((double) value / totalValue) * TO_PERCENTAGE); + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/CloudPipelineAPIClient.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/CloudPipelineAPIClient.java index 62b38fad21..ecd2737173 100644 --- a/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/CloudPipelineAPIClient.java +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/CloudPipelineAPIClient.java @@ -19,6 +19,7 @@ import com.epam.pipeline.client.pipeline.CloudPipelineAPI; import com.epam.pipeline.client.pipeline.CloudPipelineApiBuilder; import com.epam.pipeline.client.pipeline.CloudPipelineApiExecutor; +import com.epam.pipeline.entity.cluster.InstanceType; import com.epam.pipeline.entity.cluster.pool.NodePool; import com.epam.pipeline.entity.pipeline.PipelineRun; import com.epam.pipeline.entity.preference.Preference; @@ -90,4 +91,8 @@ public List saveNodePoolUsage(final List records) public boolean deleteExpiredNodePoolUsage(final LocalDate date) { return executor.execute(cloudPipelineAPI.deleteExpiredNodePoolUsage(date)); } + + public List loadAllInstanceTypes() { + return ListUtils.emptyIfNull(executor.execute(cloudPipelineAPI.loadAllInstanceTypes())); + } } diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/NodeReporterAPIExecutor.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/NodeReporterAPIExecutor.java new file mode 100644 index 0000000000..9a1122aa2c --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/rest/NodeReporterAPIExecutor.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.rest; + +import com.epam.pipeline.client.CommonRetrofitClientBuilder; +import com.epam.pipeline.client.RetrofitClientBuilder; +import com.epam.pipeline.client.RetrofitExecutor; +import com.epam.pipeline.client.SyncRetrofitExecutor; +import com.epam.pipeline.client.reporter.NodeReporterClient; +import com.epam.pipeline.config.Constants; +import com.epam.pipeline.config.JsonMapper; +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.net.Proxy; +import java.text.SimpleDateFormat; +import java.util.List; + +@Service +public class NodeReporterAPIExecutor { + + private static final ObjectMapper DEFAULT_MAPPER = new JsonMapper() + .setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE) + .setDateFormat(new SimpleDateFormat(Constants.FMT_ISO_LOCAL_DATE)) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final RetrofitExecutor executor = new SyncRetrofitExecutor(); + private final RetrofitClientBuilder builder = new CommonRetrofitClientBuilder(); + private final String schema; + private final int port; + + public NodeReporterAPIExecutor(@Value("${node.reporter.srv.schema}") final String schema, + @Value("${node.reporter.srv.port}") final int port) { + this.schema = schema; + this.port = port; + } + + public List loadGpuStats(final String host) { + return executor.execute(getClient(host).loadGpuStats()); + } + + private NodeReporterClient getClient(final String host) { + return builder.build(NodeReporterClient.class, schema, host, port, DEFAULT_MAPPER, Proxy.NO_PROXY); + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/InstanceTypesLoader.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/InstanceTypesLoader.java new file mode 100644 index 0000000000..38d7a25d23 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/InstanceTypesLoader.java @@ -0,0 +1,59 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service; + +import com.epam.pipeline.entity.cluster.InstanceType; +import com.epam.pipeline.monitor.rest.CloudPipelineAPIClient; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Refreshes GPU instance types + */ +@Service +@RequiredArgsConstructor +public class InstanceTypesLoader { + private final CloudPipelineAPIClient client; + private final Set gpuInstanceTypes = ConcurrentHashMap.newKeySet(); + + @PostConstruct + public void init() { + refreshInstances(); + } + + @Scheduled(fixedDelayString = "${refresh.instances.timeout:86400000}") + public void refreshInstances() { + final Set loadedTypes = client.loadAllInstanceTypes().stream() + .filter(it -> it.getGpu() > 0) + .map(InstanceType::getName) + .filter(StringUtils::isNotBlank) + .collect(Collectors.toSet()); + gpuInstanceTypes.clear(); + gpuInstanceTypes.addAll(loadedTypes); + } + + public Set loadGpuInstanceTypes() { + return gpuInstanceTypes; + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/ElasticsearchService.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/ElasticsearchService.java new file mode 100644 index 0000000000..7fb230a64e --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/ElasticsearchService.java @@ -0,0 +1,125 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service.elasticsearch; + +import io.swagger.models.HttpMethod; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.ListUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.rest.RestStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class ElasticsearchService { + + private final RestHighLevelClient elasticsearchClient; + private final int bulkSize; + + public ElasticsearchService(final RestHighLevelClient elasticsearchClient, + @Value("${es.index.bulk.size:1000}") final int bulkSize) { + this.elasticsearchClient = elasticsearchClient; + this.bulkSize = bulkSize; + } + + public void createIndexIfNotExists(final String indexName, final String source) { + log.debug("Start to create Elasticsearch index ..."); + try { + if (isIndexExists(indexName)) { + log.debug("Index with name {} already exists", indexName); + return; + } + final Request request = new Request(HttpMethod.PUT.name(), indexName); + request.setJsonEntity(source); + + final Response response = elasticsearchClient.getLowLevelClient().performRequest(request); + final int statusCode = response.getStatusLine().getStatusCode(); + Assert.isTrue(statusCode == RestStatus.OK.getStatus(), + "Create Elasticsearch index: " + response); + } catch (IOException e) { + throw new ElasticsearchException("Failed to create index request: " + e.getMessage(), e); + } + log.debug("Elasticsearch index with name {} was created.", indexName); + } + + public void insertBulkDocuments(final List indexRequests) { + ListUtils.partition(indexRequests, bulkSize).forEach(this::insertDocuments); + } + + private void insertDocuments(final List indexRequests) { + try { + final BulkRequest request = new BulkRequest(); + indexRequests.forEach(request::add); + final BulkResponse response = elasticsearchClient.bulk(request); + validateBulkResponse(response); + } catch (IOException e) { + log.error("Partial error during index sync: {}.", e.getMessage()); + } + } + + private void validateBulkResponse(final BulkResponse response) { + Assert.state(response.status() == RestStatus.OK, + "Failed to create Elasticsearch documents: " + response); + + if (ObjectUtils.isEmpty(response)) { + log.debug("No documents were created in Elasticsearch."); + return; + } + + final Map> indexResults = Arrays.stream(response.getItems()) + .collect(Collectors.partitioningBy(BulkItemResponse::isFailed)); + final List failed = indexResults.get(true); + if (CollectionUtils.isNotEmpty(failed)) { + failed.forEach(item -> log.error( + "Error for doc {} index {}: {}.", item.getId(), item.getIndex(), item.getFailureMessage())); + } + } + + private boolean isIndexExists(final String indexName) { + try { + final Request request = new Request(HttpMethod.HEAD.name(), indexName); + final Response response = elasticsearchClient.getLowLevelClient().performRequest(request); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == RestStatus.NOT_FOUND.getStatus()) { + return false; + } + if (statusCode == RestStatus.OK.getStatus()) { + return true; + } + throw new ElasticsearchException("Failed to send the request to checks index " + response); + } catch (IOException e) { + throw new ElasticsearchException("Failed to send the request to checks index " + e.getMessage(), e); + } + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/GpuMonitorIndexHelper.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/GpuMonitorIndexHelper.java new file mode 100644 index 0000000000..5b1d98e570 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/GpuMonitorIndexHelper.java @@ -0,0 +1,166 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service.elasticsearch; + +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; +import com.epam.pipeline.monitor.model.node.GpuUsageStats; +import com.epam.pipeline.monitor.model.node.GpuUsageSummary; +import com.epam.pipeline.monitor.model.node.GpuUsages; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.apache.commons.collections4.ListUtils; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +@SuppressWarnings({"HideUtilityClassConstructor"}) +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class GpuMonitorIndexHelper { + + private static final String GPU_INDEX_TYPE = "gpu"; + private static final String GPU_AGGS_INDEX_TYPE = "gpu_aggs"; + private static final String GPU_TIMESTAMP_FIELD = "GpuMetricsTimestamp"; + private static final String GPU_STATS_TIMESTAMP_FIELD = "GpuAggsMetricsTimestamp"; + private static final String METRICS_FIELD = "Metrics"; + private static final String TAGS_FIELD = "MetricsTags"; + private static final String GPU_UTILIZATION_FIELD = "gpu/utilization_gpu"; + private static final String MEMORY_UTILIZATION_FIELD = "gpu/utilization_memory"; + private static final String MEMORY_USED_FIELD = "gpu/used_memory"; + private static final String ACTIVE_GPUS_FIELD = "gpu_aggs/active_gpus"; + private static final String AVG_GPU_UTILIZATION_FIELD = "gpu_aggs/avg_utilization_gpu"; + private static final String AVG_MEMORY_UTILIZATION_FIELD = "gpu_aggs/avg_utilization_memory"; + private static final String AVG_MEMORY_USED_FIELD = "gpu_aggs/avg_used_memory"; + private static final String MIN_GPU_UTILIZATION_FIELD = "gpu_aggs/min_utilization_gpu"; + private static final String MIN_MEMORY_UTILIZATION_FIELD = "gpu_aggs/min_utilization_memory"; + private static final String MIN_MEMORY_USED_FIELD = "gpu_aggs/min_used_memory"; + private static final String MAX_GPU_UTILIZATION_FIELD = "gpu_aggs/max_utilization_gpu"; + private static final String MAX_MEMORY_UTILIZATION_FIELD = "gpu_aggs/max_utilization_memory"; + private static final String MAX_MEMORY_USED_FIELD = "gpu_aggs/max_used_memory"; + private static final String INDEX_FIELD = "index"; + private static final String NODENAME_FIELD = "nodename"; + private static final String TYPE_FIELD = "type"; + private static final String DEVICE_NAME_FIELD = "device_name"; + private static final String NODE_TYPE = "node"; + private static final String VALUE_FIELD = "value"; + private static final DateTimeFormatter TIMESTAMP_FIELD_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + + public static List buildIndexRequests(final String indexName, final GpuUsages usages) { + final String timestamp = usages.getTimestamp().format(TIMESTAMP_FIELD_FORMATTER); + final List indexRequests = ListUtils.emptyIfNull(usages.getUsages()).stream() + .map(usage -> buildGpuIndexRequest(indexName, usage, usages.getNodename(), timestamp)) + .collect(Collectors.toList()); + indexRequests.add(buildGpuStatsIndexRequest(indexName, usages.getStats(), usages.getNodename(), timestamp)); + return indexRequests; + } + + private static IndexRequest buildGpuStatsIndexRequest(final String indexName, final GpuUsageStats stats, + final String nodename, final String timestamp) { + return buildIndexRequest(indexName, GPU_AGGS_INDEX_TYPE, buildGpuStatDocument(stats, nodename, timestamp)); + } + + private static IndexRequest buildGpuIndexRequest(final String indexName, final NodeReporterGpuUsages usages, + final String nodename, final String timestamp) { + return buildIndexRequest(indexName, GPU_INDEX_TYPE, buildGpuDocument(usages, nodename, timestamp)); + } + + private static IndexRequest buildIndexRequest(final String indexName, final String indexType, + final XContentBuilder source) { + return new IndexRequest() + .index(indexName) + .type(indexType) + .id(UUID.randomUUID().toString()) + .source(source); + } + + private static XContentBuilder buildGpuDocument(final NodeReporterGpuUsages usages, final String nodename, + final String timestamp) { + try (XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()) { + jsonBuilder.startObject(); + + jsonBuilder.field(GPU_TIMESTAMP_FIELD, timestamp); + + jsonBuilder.startObject(METRICS_FIELD); + fillMetricsValue(jsonBuilder, GPU_UTILIZATION_FIELD, usages.getUtilizationGpu()); + fillMetricsValue(jsonBuilder, MEMORY_UTILIZATION_FIELD, usages.getMemoryUtilization()); + fillMetricsValue(jsonBuilder, MEMORY_USED_FIELD, usages.getMemoryUsed()); + jsonBuilder.endObject(); + + jsonBuilder.startObject(TAGS_FIELD) + .field(INDEX_FIELD, usages.getIndex()) + .field(NODENAME_FIELD, nodename) + .field(TYPE_FIELD, NODE_TYPE) + .field(DEVICE_NAME_FIELD, usages.getName()) + .endObject(); + + return jsonBuilder.endObject(); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to create elasticsearch document: ", e); + } + } + + private static XContentBuilder buildGpuStatDocument(final GpuUsageStats stats, final String nodename, + final String timestamp) { + try (XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()) { + jsonBuilder.startObject(); + + jsonBuilder.field(GPU_STATS_TIMESTAMP_FIELD, timestamp); + + jsonBuilder.startObject(METRICS_FIELD); + fillMetricsValue(jsonBuilder, ACTIVE_GPUS_FIELD, stats.getActiveGpusUtilization()); + fillSummaryMetricsValues(jsonBuilder, stats.getAverage(), + AVG_GPU_UTILIZATION_FIELD, AVG_MEMORY_UTILIZATION_FIELD, AVG_MEMORY_USED_FIELD); + fillSummaryMetricsValues(jsonBuilder, stats.getMin(), + MIN_GPU_UTILIZATION_FIELD, MIN_MEMORY_UTILIZATION_FIELD, MIN_MEMORY_USED_FIELD); + fillSummaryMetricsValues(jsonBuilder, stats.getMax(), + MAX_GPU_UTILIZATION_FIELD, MAX_MEMORY_UTILIZATION_FIELD, MAX_MEMORY_USED_FIELD); + jsonBuilder.endObject(); + + jsonBuilder.startObject(TAGS_FIELD) + .field(NODENAME_FIELD, nodename) + .field(TYPE_FIELD, NODE_TYPE) + .field(DEVICE_NAME_FIELD, stats.getDeviceName()) + .endObject(); + + return jsonBuilder.endObject(); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to create elasticsearch document: ", e); + } + } + + private static void fillMetricsValue(final XContentBuilder builder, final String filed, final Integer value) + throws IOException { + builder.startObject(filed) + .field(VALUE_FIELD, value) + .endObject(); + } + + private static void fillSummaryMetricsValues(final XContentBuilder builder, final GpuUsageSummary summary, + final String utilizationGpuFiled, + final String utilizationMemoryField, + final String usedMemoryField) throws IOException { + fillMetricsValue(builder, utilizationGpuFiled, summary.getGpuUtilization()); + fillMetricsValue(builder, utilizationMemoryField, summary.getMemoryUtilization()); + fillMetricsValue(builder, usedMemoryField, summary.getMemoryUsage()); + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/MonitoringElasticsearchService.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/MonitoringElasticsearchService.java new file mode 100644 index 0000000000..d964c48ddd --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/elasticsearch/MonitoringElasticsearchService.java @@ -0,0 +1,92 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service.elasticsearch; + +import com.epam.pipeline.monitor.model.node.GpuUsages; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.ListUtils; +import org.apache.commons.io.IOUtils; +import org.elasticsearch.action.index.IndexRequest; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; +import org.springframework.util.ResourceUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class MonitoringElasticsearchService { + + private static final DateTimeFormatter INDEX_FORMATTER = DateTimeFormatter.ofPattern("yyyy.MM.dd"); + + private final ElasticsearchService elasticsearchService; + private final String indexNamePrefix; + private final String indexMappingsFile; + + public MonitoringElasticsearchService(final ElasticsearchService elasticsearchService, + @Value("${es.gpu.monitor.index.prefix:cp-gpu-monitor-}") + final String indexNamePrefix, + @Value("${es.gpu.monitor.index.mappings}") final String indexMappingsFile) { + this.elasticsearchService = elasticsearchService; + this.indexNamePrefix = indexNamePrefix; + this.indexMappingsFile = indexMappingsFile; + } + + public void saveGpuUsages(final List usages) { + if (CollectionUtils.isEmpty(usages)) { + return; + } + final String indexName = indexNamePrefix + LocalDateTime.now().format(INDEX_FORMATTER); + final String mappingsJson = readMappingsJson(); + elasticsearchService.createIndexIfNotExists(indexName, mappingsJson); + final List indexRequests = ListUtils.emptyIfNull(usages).stream() + .flatMap(usage -> GpuMonitorIndexHelper.buildIndexRequests(indexName, usage).stream()) + .collect(Collectors.toList()); + elasticsearchService.insertBulkDocuments(indexRequests); + } + + private String readMappingsJson() { + try { + return IOUtils.toString(openJsonMapping(indexMappingsFile), Charset.defaultCharset()); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot read mappings file: " + indexMappingsFile); + } + } + + private InputStream openJsonMapping(final String path) throws IOException { + if (path.startsWith(ResourceUtils.CLASSPATH_URL_PREFIX)) { + final InputStream classPathResource = getClass().getResourceAsStream(path + .substring(ResourceUtils.CLASSPATH_URL_PREFIX.length())); + Assert.notNull(classPathResource, String.format("Failed to resolve path: %s", path)); + return classPathResource; + } + if (path.startsWith(ResourceUtils.FILE_URL_PREFIX)) { + return Files.newInputStream(Paths.get(path.substring(ResourceUtils.FILE_URL_PREFIX.length()))); + } + throw new IllegalArgumentException("Unsupported mapping file: " + path); + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/k8s/KubernetesUtils.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/k8s/KubernetesUtils.java new file mode 100644 index 0000000000..c155063da2 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/k8s/KubernetesUtils.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service.k8s; + +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodStatus; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.collections4.ListUtils; + +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public final class KubernetesUtils { + private static final int CONNECTION_TIMEOUT_MS = 2 * 1000; + private static final String UNRESOLVED_POD = "unresolved pod"; + private static final String UNRESOLVED_NODE = "unresolved node"; + + public static KubernetesClient getKubernetesClient() { + return new DefaultKubernetesClient(new ConfigBuilder() + .withConnectionTimeout(CONNECTION_TIMEOUT_MS) + .build()); + } + + public static String getPodName(final Pod pod) { + return Optional.ofNullable(pod) + .map(Pod::getMetadata) + .map(ObjectMeta::getName) + .orElse(UNRESOLVED_POD); + } + + public static String getNodeName(final Pod pod) { + return Optional.ofNullable(pod) + .map(Pod::getSpec) + .map(PodSpec::getNodeName) + .orElse(UNRESOLVED_NODE); + } + + public static Optional getPodIp(final Pod pod) { + return Optional.ofNullable(pod) + .map(Pod::getStatus) + .map(PodStatus::getPodIP); + } + + public static List findNodesByLabel(final KubernetesClient client, final String key, + final Set values) { + return ListUtils.emptyIfNull(client.nodes() + .withLabelIn(key, values.toArray(new String[0])) + .list() + .getItems()); + } + + public static List findPodsByLabel(final KubernetesClient client, final String key, final String value, + final String namespace) { + return ListUtils.emptyIfNull(client.pods() + .inNamespace(namespace) + .withLabel(key, value) + .list() + .getItems()); + } + + private KubernetesUtils() { + // no-op + } +} diff --git a/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/reporter/NodeReporterService.java b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/reporter/NodeReporterService.java new file mode 100644 index 0000000000..a3a083d2a6 --- /dev/null +++ b/monitoring-service/src/main/java/com/epam/pipeline/monitor/service/reporter/NodeReporterService.java @@ -0,0 +1,119 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.service.reporter; + +import com.epam.pipeline.monitor.model.node.GpuUsages; +import com.epam.pipeline.monitor.rest.NodeReporterAPIExecutor; +import com.epam.pipeline.monitor.service.k8s.KubernetesUtils; +import io.fabric8.kubernetes.api.model.Node; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class NodeReporterService { + private static final String NAME_LABEL = "name"; + private static final String CLOUD_INS_TYPE_LABEL = "cloud_ins_type"; + + private final String reportingPodName; + private final String namespace; + private final Executor executor; + private final NodeReporterAPIExecutor nodeReporterClient; + + public NodeReporterService(@Value("${node.reporter.srv.pod.name:cp-node-reporter}") + final String reportingPodName, + @Value("${node.reporter.srv.namespace:default}") final String namespace, + @Value("${monitoring.gpu.usage.pool.size:1}") final int poolSize, + final NodeReporterAPIExecutor nodeReporterClient) { + this.reportingPodName = reportingPodName; + this.namespace = namespace; + this.executor = Executors.newFixedThreadPool(poolSize); + this.nodeReporterClient = nodeReporterClient; + } + + public List collectGpuUsages(final Set gpuInstanceTypes) { + try (KubernetesClient client = KubernetesUtils.getKubernetesClient()) { + final Set nodeNames = getGpuNodeNames(client, gpuInstanceTypes); + final List pods = getReportingPods(client, nodeNames); + return requestUsages(pods).stream() + .map(CompletableFuture::join) + .collect(Collectors.toList()); + } catch (KubernetesClientException e) { + log.error("An error occurred while sending request to k8s", e); + return Collections.emptyList(); + } + } + + private Set getGpuNodeNames(final KubernetesClient client, final Set gpuInstanceTypes) { + return KubernetesUtils.findNodesByLabel(client, CLOUD_INS_TYPE_LABEL, gpuInstanceTypes).stream() + .map(Node::getMetadata) + .filter(Objects::nonNull) + .map(ObjectMeta::getName) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + private List getReportingPods(final KubernetesClient client, final Set monitoringNodeNames) { + return KubernetesUtils.findPodsByLabel(client, NAME_LABEL, reportingPodName, namespace).stream() + .filter(pod -> monitoringNodeNames.contains(pod.getSpec().getNodeName())) + .collect(Collectors.toList()); + } + + private List> requestUsages(final List reporterPods) { + return reporterPods.stream() + .map(pod -> requestUsages(pod) + .exceptionally(e -> { + log.error("Failed to collect usages from {} for {}.", + KubernetesUtils.getPodName(pod), + KubernetesUtils.getNodeName(pod), e); + return GpuUsages.builder().build(); + })) + .collect(Collectors.toList()); + } + + private CompletableFuture requestUsages(final Pod pod) { + return CompletableFuture.supplyAsync(() -> getStats(pod).orElseThrow(IllegalArgumentException::new), executor); + } + + private Optional getStats(final Pod pod) { + final String nodename = KubernetesUtils.getNodeName(pod); + log.info("Retrieving usages from {} for {}...", KubernetesUtils.getPodName(pod), nodename); + return KubernetesUtils.getPodIp(pod) + .map(nodeReporterClient::loadGpuStats) + .map(stat -> GpuUsages.builder() + .usages(stat) + .nodename(nodename) + .timestamp(LocalDateTime.now()) + .build()); + } +} diff --git a/monitoring-service/src/main/resources/application.properties b/monitoring-service/src/main/resources/application.properties index 92559579cb..5c71eb4928 100644 --- a/monitoring-service/src/main/resources/application.properties +++ b/monitoring-service/src/main/resources/application.properties @@ -18,3 +18,21 @@ preference.name.usage.node.pool.enable=monitoring.node.pool.usage.enable preference.name.usage.node.pool.clean.enable=monitoring.node.pool.usage.clean.enable preference.name.usage.node.pool.clean.delay=monitoring.node.pool.usage.clean.delay preference.name.usage.node.pool.store.period=monitoring.node.pool.usage.store.days +preference.name.usage.node.gpu.delay=monitoring.gpu.usage.delay +preference.name.usage.node.gpu.enable=monitoring.gpu.usage.enable + +# Elasticsearch +es.port=${CP_HEAPSTER_ELK_INTERNAL_PORT:30094} +es.host=${CP_HEAPSTER_ELK_INTERNAL_HOST:cp-heapster-elk.default.svc.cluster.local} +es.gpu.monitor.index.prefix=cp-gpu-monitor- +es.gpu.monitor.index.mappings=classpath:/templates/cp_monitor_gpu_index.json +es.index.bulk.size=1000 + +# Node Reporter Service +node.reporter.srv.pod.name=${CP_VM_MONITOR_NODE_REPORTING_POD_NAME:cp-node-reporter} +node.reporter.srv.namespace=${CP_VM_MONITOR_NODE_NAMESPACE:default} +node.reporter.srv.schema=${CP_VM_MONITOR_NODE_STATS_REQUEST_SCHEMA:http} +node.reporter.srv.port=${CP_VM_MONITOR_NODE_STATS_REQUEST_PORT:8000} + +# Monitor Gpu Usages +monitoring.gpu.usage.pool.size=${CP_MONITOR_SRV_GPU_USAGE_POOL_SIZE:1} diff --git a/monitoring-service/src/main/resources/templates/cp_monitor_gpu_index.json b/monitoring-service/src/main/resources/templates/cp_monitor_gpu_index.json new file mode 100644 index 0000000000..7414777e6e --- /dev/null +++ b/monitoring-service/src/main/resources/templates/cp_monitor_gpu_index.json @@ -0,0 +1,167 @@ +{ + "mappings": { + "gpu": { + "properties": { + "GpuMetricsTimestamp": { + "type": "date" + }, + "Metrics": { + "properties": { + "gpu/utilization_gpu": { + "properties": { + "value": { + "type": "double" + } + } + }, + "gpu/utilization_memory": { + "properties": { + "value": { + "type": "double" + } + } + }, + "gpu/used_memory": { + "properties": { + "value": { + "type": "double" + } + } + } + } + }, + "MetricsTags": { + "properties": { + "index": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "nodename": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "type": { + "type": "keyword" + }, + "device_name": { + "type": "keyword" + } + } + } + } + }, + "gpu_aggs": { + "properties": { + "GpuAggsMetricsTimestamp": { + "type": "date" + }, + "Metrics": { + "properties": { + "gpu_aggs/avg_utilization_gpu": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/avg_utilization_memory": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/avg_used_memory": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/active_gpus": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/min_utilization_gpu": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/min_utilization_memory": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/min_used_memory": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/max_utilization_gpu": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/max_utilization_memory": { + "properties": { + "value": { + "type": "integer" + } + } + }, + "gpu_aggs/max_used_memory": { + "properties": { + "value": { + "type": "integer" + } + } + } + } + }, + "MetricsTags": { + "properties": { + "nodename": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "type": { + "type": "keyword" + }, + "device_name": { + "type": "keyword" + } + } + } + } + } + }, + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 0 + } + } +} \ No newline at end of file diff --git a/monitoring-service/src/test/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringServiceTest.java b/monitoring-service/src/test/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringServiceTest.java new file mode 100644 index 0000000000..8610245cdd --- /dev/null +++ b/monitoring-service/src/test/java/com/epam/pipeline/monitor/monitoring/node/GpuUsageMonitoringServiceTest.java @@ -0,0 +1,210 @@ +/* + * Copyright 2017-2024 EPAM Systems, Inc. (https://www.epam.com/) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.epam.pipeline.monitor.monitoring.node; + +import com.epam.pipeline.entity.reporter.NodeReporterGpuUsages; +import com.epam.pipeline.monitor.model.node.GpuUsageStats; +import com.epam.pipeline.monitor.model.node.GpuUsageSummary; +import com.epam.pipeline.monitor.model.node.GpuUsages; +import com.epam.pipeline.monitor.rest.CloudPipelineAPIClient; +import com.epam.pipeline.monitor.service.InstanceTypesLoader; +import com.epam.pipeline.monitor.service.elasticsearch.MonitoringElasticsearchService; +import com.epam.pipeline.monitor.service.reporter.NodeReporterService; +import org.junit.jupiter.api.Test; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +class GpuUsageMonitoringServiceTest { + private static final String TEST = "test"; + private static final String NODE_1 = "node1"; + private static final String NODE_2 = "node2"; + private static final String GPU_ID_1 = "0"; + private static final String GPU_ID_2 = "1"; + private static final int GPU_UTILIZATION = 80; + private static final int MEMORY_USED = 2000; + private static final int TOTAL_MEMORY = 20000; + private static final int MEMORY_UTILIZATION_1 = 10; + private static final int TEST_VALUE_1 = 100; + private static final int TEST_VALUE_2 = 50; + + private final CloudPipelineAPIClient cloudPipelineClient = mock(CloudPipelineAPIClient.class); + private final NodeReporterService nodeReporterService = mock(NodeReporterService.class); + private final MonitoringElasticsearchService monitoringElasticsearchService = + mock(MonitoringElasticsearchService.class); + private final InstanceTypesLoader instanceTypesLoader = mock(InstanceTypesLoader.class); + private final GpuUsageMonitoringService monitor = new GpuUsageMonitoringService( + TEST, cloudPipelineClient, nodeReporterService, monitoringElasticsearchService, instanceTypesLoader); + + @Test + void shouldSkipProcessIfNotRequired() { + when(cloudPipelineClient.getBooleanPreference(TEST)).thenReturn(false); + monitor.monitor(); + verifyZeroInteractions(nodeReporterService, monitoringElasticsearchService); + } + + @Test + void shouldSkipProcessIfNoUsagesLoaded() { + when(cloudPipelineClient.getBooleanPreference(TEST)).thenReturn(true); + when(nodeReporterService.collectGpuUsages(any())).thenReturn(null); + monitor.monitor(); + verifyZeroInteractions(monitoringElasticsearchService); + } + + // case example: unexpected error during node reporter query + @Test + void shouldProcessEmptyUsages() { + final List usages = Collections.singletonList(GpuUsages.builder().build()); + when(cloudPipelineClient.getBooleanPreference(TEST)).thenReturn(true); + when(nodeReporterService.collectGpuUsages(any())).thenReturn(usages); + monitor.monitor(); + verify(monitoringElasticsearchService).saveGpuUsages(Collections.emptyList()); + } + + @Test + void shouldProcessGpuUsages() { + final NodeReporterGpuUsages usage1 = NodeReporterGpuUsages.builder() + .name(TEST) + .index(GPU_ID_1) + .utilizationGpu(GPU_UTILIZATION) + .memoryUsed(MEMORY_USED) + .memoryTotal(TOTAL_MEMORY) + .build(); + final NodeReporterGpuUsages usage2 = NodeReporterGpuUsages.builder() + .name(TEST) + .index(GPU_ID_2) + .utilizationGpu(GPU_UTILIZATION) + .memoryUsed(MEMORY_USED) + .memoryTotal(TOTAL_MEMORY) + .build(); + + final NodeReporterGpuUsages usage3 = NodeReporterGpuUsages.builder() + .name(TEST) + .index(GPU_ID_1) + .utilizationGpu(GPU_UTILIZATION) + .memoryUsed(MEMORY_USED) + .memoryTotal(TOTAL_MEMORY) + .build(); + final NodeReporterGpuUsages usage4 = NodeReporterGpuUsages.builder() + .name(TEST) + .index(GPU_ID_2) + .utilizationGpu(0) + .memoryUsed(0) + .memoryTotal(TOTAL_MEMORY) + .build(); + + final GpuUsages gpuUsage1 = GpuUsages.builder() + .timestamp(LocalDateTime.now()) + .nodename(NODE_1) + .usages(Arrays.asList(usage1, usage2)) + .build(); + + final GpuUsages gpuUsage2 = GpuUsages.builder() + .timestamp(LocalDateTime.now()) + .nodename(NODE_2) + .usages(Arrays.asList(usage3, usage4)) + .build(); + + final List usages = Arrays.asList(gpuUsage1, gpuUsage2); + when(cloudPipelineClient.getBooleanPreference(TEST)).thenReturn(true); + when(nodeReporterService.collectGpuUsages(any())).thenReturn(usages); + + monitor.monitor(); + + verify(monitoringElasticsearchService) + .saveGpuUsages(Arrays.asList(expectedGpuUsages1(gpuUsage1), expectedGpuUsages2(gpuUsage2))); + } + + private static GpuUsages expectedGpuUsages1(final GpuUsages gpuUsage1) { + final GpuUsageSummary average = GpuUsageSummary.builder() + .gpuUtilization(GPU_UTILIZATION) + .memoryUsage(MEMORY_USED) + .memoryUtilization(MEMORY_UTILIZATION_1) + .build(); + final GpuUsageSummary min = GpuUsageSummary.builder() + .gpuUtilization(GPU_UTILIZATION) + .memoryUsage(MEMORY_USED) + .memoryUtilization(MEMORY_UTILIZATION_1) + .build(); + final GpuUsageSummary max = GpuUsageSummary.builder() + .gpuUtilization(GPU_UTILIZATION) + .memoryUsage(MEMORY_USED) + .memoryUtilization(MEMORY_UTILIZATION_1) + .build(); + final GpuUsageStats stats = GpuUsageStats.builder() + .average(average) + .min(min) + .max(max) + .activeGpusUtilization(TEST_VALUE_1) + .deviceName(TEST) + .build(); + + return GpuUsages.builder() + .nodename(gpuUsage1.getNodename()) + .timestamp(gpuUsage1.getTimestamp()) + .usages(new ArrayList<>(gpuUsage1.getUsages().stream() + .peek(value -> value.setMemoryUtilization(MEMORY_UTILIZATION_1)) + .collect(Collectors.toList()))) + .stats(stats) + .build(); + } + + private static GpuUsages expectedGpuUsages2(final GpuUsages gpuUsage2) { + final GpuUsageSummary average = GpuUsageSummary.builder() + .gpuUtilization(GPU_UTILIZATION / 2) + .memoryUsage(MEMORY_USED / 2) + .memoryUtilization(MEMORY_UTILIZATION_1 / 2) + .build(); + final GpuUsageSummary min = GpuUsageSummary.builder() + .gpuUtilization(0) + .memoryUsage(0) + .memoryUtilization(0) + .build(); + final GpuUsageSummary max = GpuUsageSummary.builder() + .gpuUtilization(GPU_UTILIZATION) + .memoryUsage(MEMORY_USED) + .memoryUtilization(MEMORY_UTILIZATION_1) + .build(); + final GpuUsageStats stats = GpuUsageStats.builder() + .deviceName(TEST) + .average(average) + .min(min) + .max(max) + .activeGpusUtilization(TEST_VALUE_2) + .build(); + + return GpuUsages.builder() + .nodename(gpuUsage2.getNodename()) + .timestamp(gpuUsage2.getTimestamp()) + .usages(new ArrayList<>(gpuUsage2.getUsages().stream() + .peek(value -> value.setMemoryUtilization( + GPU_ID_1.equals(value.getIndex()) ? MEMORY_UTILIZATION_1 : 0)) + .collect(Collectors.toList()))) + .stats(stats) + .build(); + } +}