Skip to content

Commit

Permalink
API: Show GPU stats on the node utilization page - fixes (#3640)
Browse files Browse the repository at this point in the history
* GPU Stats API: allow non-existing indices

* GPU Stats API: fix for overlapping intervals

* GPU Stats API: fix for GPU instance types loading during monitoring process

* GPU Stats API: refactor load gpu charts
  • Loading branch information
ekazachkova authored Aug 20, 2024
1 parent b959e1b commit dba383d
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private Request convertToRequest(final SearchRequest searchRequest) throws IOExc
request.addParameter("allow_no_indices", "true");
request.addParameter("expand_wildcards", "open,closed");
request.addParameter("search_type", "query_then_fetch");
request.addParameter("ignore_unavailable", "false");
request.addParameter("ignore_unavailable", "true");
request.addParameter("batched_reduce_size", "512");
if (searchRequest.source() != null) {
request.setEntity(createEntity(searchRequest.source(), XContentType.JSON));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,16 +328,12 @@ private List<MonitoringStats> getGpuCharts(final List<GpuMetricsGranularity> loa
if (!GpuMetricsGranularity.hasAggregations(loadTypes) && !GpuMetricsGranularity.hasDetails(loadTypes)) {
return null;
}
final List<MonitoringStats> charts = new ArrayList<>();
if (GpuMetricsGranularity.hasAggregations(loadTypes)) {
charts.addAll(requestCharts(aggregationRequester, interval, nodeName, start, end));
}
if (GpuMetricsGranularity.hasDetails(loadTypes)) {
final GPUDetailsRequester detailsRequester = new GPUDetailsRequester(client);
charts.addAll(requestCharts(detailsRequester, interval, nodeName, start, end));
}
if (GpuMetricsGranularity.hasDetails(loadTypes) && GpuMetricsGranularity.hasAggregations(loadTypes)) {
return charts.stream()
final List<MonitoringStats> charts = new ArrayList<>(aggregationRequester
.requestStats(nodeName, start, end, interval));
final GPUDetailsRequester detailsRequester = new GPUDetailsRequester(client);
charts.addAll(detailsRequester.requestStats(nodeName, start, end, interval));
return sortGpuCharts(charts.stream()
.collect(Collectors.groupingBy(MonitoringStats::getStartTime,
Collectors.reducing(this::mergeGpuStats)))
.values().stream()
Expand All @@ -346,15 +342,13 @@ private List<MonitoringStats> getGpuCharts(final List<GpuMetricsGranularity> loa
.filter(stats -> stats.getGpuUsage() != null && stats.getGpuDetails() != null)
.map(stats -> statsWithinRegion(stats, start, end, interval))
.filter(Optional::isPresent)
.map(Optional::get)
.sorted(Comparator.comparing(MonitoringStats::getStartTime,
Comparator.comparing(this::asMonitoringDateTime)))
.collect(Collectors.toList());
.map(Optional::get));
}
return charts.stream()
.sorted(Comparator.comparing(MonitoringStats::getStartTime,
Comparator.comparing(this::asMonitoringDateTime)))
.collect(Collectors.toList());
if (GpuMetricsGranularity.hasAggregations(loadTypes)) {
return sortGpuCharts(requestCharts(aggregationRequester, interval, nodeName, start, end).stream());
}
final GPUDetailsRequester detailsRequester = new GPUDetailsRequester(client);
return sortGpuCharts(requestCharts(detailsRequester, interval, nodeName, start, end).stream());
}

private List<MonitoringStats> requestCharts(final AbstractMetricRequester requester, final Duration interval,
Expand All @@ -365,4 +359,11 @@ private List<MonitoringStats> requestCharts(final AbstractMetricRequester reques
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

private List<MonitoringStats> sortGpuCharts(final Stream<MonitoringStats> stats) {
return stats
.sorted(Comparator.comparing(MonitoringStats::getStartTime,
Comparator.comparing(this::asMonitoringDateTime)))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.epam.pipeline.client.pipeline;

import com.epam.pipeline.entity.app.ApplicationInfo;
import com.epam.pipeline.entity.cluster.AllowedInstanceAndPriceTypes;
import com.epam.pipeline.entity.cluster.InstanceType;
import com.epam.pipeline.entity.cluster.NodeDisk;
import com.epam.pipeline.entity.cluster.NodeInstance;
Expand Down Expand Up @@ -297,6 +298,10 @@ Call<Result<List<PipelineRun>>> loadRunsActivityStats(@Query(FROM) String from,
@GET("cluster/instance/loadAll")
Call<Result<List<InstanceType>>> loadAllInstanceTypes();

@GET("cluster/instance/allowed")
Call<Result<AllowedInstanceAndPriceTypes>> loadAllowedInstanceAndPriceTypesForRegion(
@Query(REGION_ID) Long regionId);

@GET("cluster/node/{id}/disks")
Call<Result<List<NodeDisk>>> loadNodeDisks(@Path(ID) String nodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
@Value
public class AllowedInstanceAndPriceTypes {

@JsonProperty("allowed.instance.types")
private final List<InstanceType> allowedInstanceTypes;
@JsonProperty("cluster.allowed.instance.types")
List<InstanceType> allowedInstanceTypes;

@JsonProperty("allowed.instance.docker.types")
private final List<InstanceType> allowedInstanceDockerTypes;
@JsonProperty("cluster.allowed.instance.types.docker")
List<InstanceType> allowedInstanceDockerTypes;

@JsonProperty("allowed.price.types")
private final List<String> allowedPriceTypes;
@JsonProperty("cluster.allowed.price.types")
List<String> allowedPriceTypes;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import com.epam.pipeline.client.pipeline.CloudPipelineAPI;
import com.epam.pipeline.client.pipeline.CloudPipelineApiBuilder;
import com.epam.pipeline.client.pipeline.CloudPipelineApiExecutor;
import com.epam.pipeline.entity.cluster.AllowedInstanceAndPriceTypes;
import com.epam.pipeline.entity.cluster.InstanceType;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.PipelineRun;
import com.epam.pipeline.entity.preference.Preference;
import com.epam.pipeline.entity.region.AbstractCloudRegion;
import com.epam.pipeline.vo.cluster.pool.NodePoolUsage;
import com.epam.pipeline.vo.user.OnlineUsers;
import org.apache.commons.collections4.ListUtils;
Expand Down Expand Up @@ -96,6 +98,14 @@ public List<InstanceType> loadAllInstanceTypes() {
return ListUtils.emptyIfNull(executor.execute(cloudPipelineAPI.loadAllInstanceTypes()));
}

public List<? extends AbstractCloudRegion> loadAllRegions() {
return ListUtils.emptyIfNull(executor.execute(cloudPipelineAPI.loadAllRegions()));
}

public AllowedInstanceAndPriceTypes loadAllowedInstanceAndPriceTypesForRegion(final Long regionId) {
return executor.execute(cloudPipelineAPI.loadAllowedInstanceAndPriceTypesForRegion(regionId));
}

public void archiveRuns() {
executor.execute(cloudPipelineAPI.archiveRuns());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
package com.epam.pipeline.monitor.service;

import com.epam.pipeline.entity.cluster.InstanceType;
import com.epam.pipeline.entity.region.AbstractCloudRegion;
import com.epam.pipeline.monitor.rest.CloudPipelineAPIClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Refreshes GPU instance types
* Refreshes GPU instance types across all regions
*/
@Service
@RequiredArgsConstructor
Expand All @@ -42,24 +46,30 @@ public class InstanceTypesLoader {

@PostConstruct
public void init() {
refreshInstances();
}

@Scheduled(fixedDelayString = "${refresh.instances.timeout:86400000}")
public void refreshInstances() {
try {
final Set<String> loadedTypes = client.loadAllInstanceTypes().stream()
.filter(it -> it.getGpu() > 0)
.map(InstanceType::getName)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
gpuInstanceTypes.clear();
gpuInstanceTypes.addAll(loadedTypes);
refreshInstances();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Scheduled(fixedDelayString = "${refresh.instances.timeout:86400000}")
public void refreshInstances() {
final Set<String> loadedTypes = client.loadAllRegions().stream()
.map(AbstractCloudRegion::getId)
.map(client::loadAllowedInstanceAndPriceTypesForRegion)
.filter(Objects::nonNull)
.flatMap(data -> Stream.concat(
ListUtils.emptyIfNull(data.getAllowedInstanceDockerTypes()).stream(),
ListUtils.emptyIfNull(data.getAllowedInstanceTypes()).stream()))
.filter(it -> it.getGpu() > 0)
.map(InstanceType::getName)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toSet());
gpuInstanceTypes.clear();
gpuInstanceTypes.addAll(loadedTypes);
}

public Set<String> loadGpuInstanceTypes() {
return gpuInstanceTypes;
}
Expand Down

0 comments on commit dba383d

Please sign in to comment.