Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue #3794: Support cloud instances in 'Cluster state' panel - cloud…
Browse files Browse the repository at this point in the history
… nodes filter
ekazachkova committed Dec 26, 2024
1 parent 751cd8f commit 3547a1e
Showing 13 changed files with 118 additions and 61 deletions.
Original file line number Diff line number Diff line change
@@ -121,18 +121,20 @@ public Result<List<NodeInstance>> loadNodes(
return Result.success(clusterApiService.getNodes(machineType));
}

@RequestMapping(value = "/cluster/node/filter", method = RequestMethod.POST)
@PostMapping("/cluster/node/filter")
@ResponseBody
@ApiOperation(
value = "Returns all ec2 nodes used in cluster, filtered by runId or address",
notes = "Returns all ec2 nodes used in cluster, filtered by runId or address",
value = "Returns all nodes used in cluster, filtered by runId or address",
notes = "Returns all nodes used in cluster, filtered by runId or address",
produces = MediaType.APPLICATION_JSON_VALUE
)
@ApiResponses(
value = {@ApiResponse(code = HTTP_STATUS_OK, message = API_STATUS_DESCRIPTION)}
)
public Result<List<NodeInstance>> filterNodes(@RequestBody FilterNodesVO filterNodesVO) {
return Result.success(clusterApiService.filterNodes(filterNodesVO));
public Result<List<NodeInstance>> filterNodes(@RequestBody final FilterNodesVO filterNodesVO,
@RequestParam(required = false, defaultValue = KUBE)
final MachineType machineType) {
return Result.success(clusterApiService.filterNodes(filterNodesVO, machineType));
}

@GetMapping(value = "/cluster/node/{name}/load")
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

package com.epam.pipeline.manager.cloud;

import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
@@ -24,7 +25,6 @@
import com.epam.pipeline.entity.cluster.InstanceOffer;
import com.epam.pipeline.entity.cluster.InstanceType;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.NodeRegionLabels;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.DiskAttachRequest;
import com.epam.pipeline.entity.pipeline.RunInstance;
@@ -110,7 +110,7 @@ RunInstance scaleUpNode(Long runId, RunInstance instance, Map<String, String> ru

void deleteInstanceTags(Long regionId, String runId, Set<String> tagNames);

List<NodeInstance> getCloudNodes(Long regionId);
List<NodeInstance> getCloudNodes(Long regionId, FilterNodesVO filterNodesVO);

Optional<NodeInstance> findCloudNode(Long regionId, String instanceId);
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import com.epam.pipeline.common.MessageConstants;
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
@@ -301,9 +302,9 @@ public boolean instanceScalingSupported(final Long regionId) {
}

@Override
public List<NodeInstance> getCloudNodes(final Long regionId) {
public List<NodeInstance> getCloudNodes(final Long regionId, final FilterNodesVO filterNodesVO) {
final AbstractCloudRegion region = regionManager.loadOrDefault(regionId);
return getInstanceService(region).getCloudNodes(region);
return getInstanceService(region).getCloudNodes(region, filterNodesVO);
}

@Override
Original file line number Diff line number Diff line change
@@ -16,12 +16,12 @@

package com.epam.pipeline.manager.cloud;

import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cluster.InstanceDisk;
import com.epam.pipeline.entity.cluster.InstanceImage;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.DiskAttachRequest;
@@ -205,12 +205,14 @@ default boolean isNodeExpired(T region, Long runId, Integer keepAliveMinutes) {
void deleteInstanceTags(T region, String runId, Set<String> tagNames);

/**
* Loads all cloud instances available for specified region. Filter by tags can be applied.
* Loads all cloud instances available for specified region.
* If specified {@link AbstractCloudRegion#getClusterStateRegionProperties()} region tags filter shall be applied.
*
* @param region region to load
* @param filter if not specified all nodes shall be loaded
* @return loaded instances
*/
List<NodeInstance> getCloudNodes(T region);
List<NodeInstance> getCloudNodes(T region, FilterNodesVO filter);

/**
* Finds cloud instance with specified instance ID in requested region. Empty if no instance found.
Original file line number Diff line number Diff line change
@@ -22,14 +22,13 @@
import com.amazonaws.services.ec2.model.InstanceStateName;
import com.amazonaws.services.ec2.model.Placement;
import com.amazonaws.services.ec2.model.Tag;
import com.epam.pipeline.controller.vo.InstanceOfferRequestVO;
import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceDNSRecordFormat;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cluster.InstanceDisk;
import com.epam.pipeline.entity.cluster.InstanceImage;
import com.epam.pipeline.entity.cluster.MachineType;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.NodeInstanceAddress;
@@ -370,8 +369,8 @@ public void deleteInstanceTags(final AwsRegion region, final String runId, final
}

@Override
public List<NodeInstance> getCloudNodes(final AwsRegion region) {
return ec2Helper.findCloudNodes(region).stream()
public List<NodeInstance> getCloudNodes(final AwsRegion region, final FilterNodesVO filter) {
return ec2Helper.findCloudNodes(region, filter).stream()
.map(instance -> cloudInstanceToNodeInstance(instance, region))
.collect(Collectors.toList());
}
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@
import com.amazonaws.waiters.WaiterParameters;
import com.epam.pipeline.common.MessageConstants;
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cluster.CloudRegionsConfiguration;
import com.epam.pipeline.entity.cluster.GpuDevice;
@@ -67,6 +68,7 @@
import com.epam.pipeline.manager.preference.PreferenceManager;
import com.epam.pipeline.manager.preference.SystemPreferences;
import com.epam.pipeline.utils.CommonUtils;
import com.epam.pipeline.utils.NetworkUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -88,6 +90,7 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -103,6 +106,8 @@ public class EC2Helper implements EC2GpuHelper {
private static final ZoneId UTC = ZoneId.of("UTC");
private static final String NAME_TAG = "tag:Name";
private static final String INSTANCE_STATE_NAME = "instance-state-name";
private static final String PRIVATE_IP_ADDRESS = "private-ip-address";
private static final String INSTANCE_ID = "instance-id";
private static final String PENDING_STATE = "pending";
private static final String RUNNING_STATE = "running";
private static final String STOPPING_STATE = "stopping";
@@ -467,11 +472,14 @@ public List<InstanceDisk> loadAttachedVolumes(final String runId, final AwsRegio
return attachedVolumes(client, instance).map(this::toDisk).collect(Collectors.toList());
}

public List<Instance> findCloudNodes(final AwsRegion awsRegion) {
final Integer preference = preferenceManager.getPreference(SystemPreferences.CLUSTER_INSTANCE_LOAD_TIMEOUT);
public List<Instance> findCloudNodes(final AwsRegion awsRegion, final FilterNodesVO filter) {
final Integer timeout = preferenceManager.getPreference(SystemPreferences.CLUSTER_INSTANCE_LOAD_TIMEOUT);
final AmazonEC2 client = getEC2Client(awsRegion);
final List<Filter> filters = buildCloudNodeFilters(awsRegion);
return findReservations(client, filters, preference, new DescribeInstancesRequest()).stream()
if (Objects.nonNull(filter)) {
addAddressFilters(filters, filter.getAddress());
}
return findReservations(client, filters, timeout, new DescribeInstancesRequest()).stream()
.flatMap(reservation -> reservation.getInstances().stream())
.collect(Collectors.toList());
}
@@ -568,12 +576,18 @@ private List<Filter> buildTagsFilters(final AwsRegion region) {
.map(tags -> MapUtils.emptyIfNull(tags).entrySet().stream()
.map(entry -> new Filter().withName("tag:" + entry.getKey()).withValues(entry.getValue()))
.collect(Collectors.toList()))
.orElse(Collections.emptyList());
.orElse(new ArrayList<>());
}

private List<Filter> buildCloudNodeFilters(final AwsRegion region) {
final List<Filter> filters = buildTagsFilters(region);
filters.add(new Filter().withName(INSTANCE_STATE_NAME).withValues(RUNNING_STATE, PENDING_STATE));
return filters;
}

private void addAddressFilters(final List<Filter> filters, final String addressFilter) {
filters.add(new Filter()
.withName(NetworkUtils.isValidIpAddress(addressFilter) ? PRIVATE_IP_ADDRESS : INSTANCE_ID)
.withValues(addressFilter));
}
}
Original file line number Diff line number Diff line change
@@ -16,18 +16,21 @@

package com.epam.pipeline.manager.cloud.azure;

import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cloud.azure.AzureVirtualMachineStats;
import com.epam.pipeline.entity.cluster.InstanceDisk;
import com.epam.pipeline.entity.cluster.InstanceImage;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.DiskAttachRequest;
import com.epam.pipeline.entity.pipeline.RunInstance;
import com.epam.pipeline.entity.region.*;
import com.epam.pipeline.entity.region.AbstractCloudRegion;
import com.epam.pipeline.entity.region.AzureRegion;
import com.epam.pipeline.entity.region.AzureRegionCredentials;
import com.epam.pipeline.entity.region.CloudProvider;
import com.epam.pipeline.exception.cloud.azure.AzureException;
import com.epam.pipeline.manager.CmdExecutor;
import com.epam.pipeline.manager.cloud.CloudInstanceService;
@@ -281,7 +284,7 @@ public void deleteInstanceTags(final AzureRegion region, final String runId, fin
}

@Override
public List<NodeInstance> getCloudNodes(final AzureRegion region) {
public List<NodeInstance> getCloudNodes(final AzureRegion region, final FilterNodesVO filter) {
throw new UnsupportedOperationException("Loading instances doesn't work with Azure provider yet.");
}

Original file line number Diff line number Diff line change
@@ -16,12 +16,12 @@

package com.epam.pipeline.manager.cloud.gcp;

import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cluster.InstanceDisk;
import com.epam.pipeline.entity.cluster.InstanceImage;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.DiskAttachRequest;
@@ -247,7 +247,7 @@ public void deleteInstanceTags(final GCPRegion region, final String runId, final
}

@Override
public List<NodeInstance> getCloudNodes(final GCPRegion region) {
public List<NodeInstance> getCloudNodes(final GCPRegion region, final FilterNodesVO filter) {
throw new UnsupportedOperationException("Loading instances doesn't work with GCP provider yet.");
}

Original file line number Diff line number Diff line change
@@ -18,12 +18,12 @@

import com.epam.pipeline.common.MessageConstants;
import com.epam.pipeline.common.MessageHelper;
import com.epam.pipeline.controller.vo.FilterNodesVO;
import com.epam.pipeline.entity.cloud.CloudInstanceOperationResult;
import com.epam.pipeline.entity.cloud.CloudInstanceState;
import com.epam.pipeline.entity.cloud.InstanceDNSRecord;
import com.epam.pipeline.entity.cloud.InstanceTerminationState;
import com.epam.pipeline.entity.cluster.InstanceDisk;
import com.epam.pipeline.entity.cluster.InstanceImage;
import com.epam.pipeline.entity.cluster.NodeInstance;
import com.epam.pipeline.entity.cluster.pool.NodePool;
import com.epam.pipeline.entity.pipeline.DiskAttachRequest;
@@ -186,7 +186,7 @@ public void deleteInstanceTags(final LocalRegion region, final String runId, fin
}

@Override
public List<NodeInstance> getCloudNodes(final LocalRegion region) {
public List<NodeInstance> getCloudNodes(final LocalRegion region, final FilterNodesVO filter) {
throw new UnsupportedOperationException("Loading instances doesn't work with local region yet.");
}

Original file line number Diff line number Diff line change
@@ -53,8 +53,8 @@ public List<NodeInstance> getNodes(final MachineType machineType) {
}

@PostFilter(NODE_READ_FILTER)
public List<NodeInstance> filterNodes(final FilterNodesVO filterNodesVO) {
return nodesManager.filterNodes(filterNodesVO);
public List<NodeInstance> filterNodes(final FilterNodesVO filterNodesVO, final MachineType machineType) {
return nodesManager.filterNodes(filterNodesVO, machineType);
}

@PreAuthorize(CLOUD_NODE_READ)
100 changes: 68 additions & 32 deletions api/src/main/java/com/epam/pipeline/manager/cluster/NodesManager.java
Original file line number Diff line number Diff line change
@@ -82,6 +82,7 @@ public class NodesManager {

private static final String MASTER_LABEL = "node-role.kubernetes.io/master";
private static final int NODE_DOWN_ATTEMPTS = 10;
private static final String ACCESS_DENIED_MSG = "Access is denied.";

@Autowired
private MessageHelper messageHelper;
@@ -103,7 +104,7 @@ public class NodesManager {

@Autowired
private KubernetesManager kubernetesManager;

@Autowired
private NodeDiskManager nodeDiskManager;

@@ -143,34 +144,26 @@ public void init() {

}

public List<NodeInstance> filterNodes(FilterNodesVO filterNodesVO) {
List<NodeInstance> result;
try (KubernetesClient client = kubernetesManager.getKubernetesClient()) {
Map<String, String> labelsMap = new HashedMap<>();
if (StringUtils.isNotBlank(filterNodesVO.getRunId())) {
labelsMap.put(KubernetesConstants.RUN_ID_LABEL, filterNodesVO.getRunId());
}
Predicate<NodeInstance> addressFilter = node -> true;
if (StringUtils.isNotBlank(filterNodesVO.getAddress())) {
Predicate<NodeInstanceAddress> addressEqualsPredicate =
address -> StringUtils.isNotBlank(address.getAddress()) &&
address.getAddress().equalsIgnoreCase(filterNodesVO.getAddress());
addressFilter = node ->
node.getAddresses() != null && node.getAddresses()
.stream()
.filter(addressEqualsPredicate).count() > 0;
}
result = client.nodes()
.withLabels(labelsMap)
.list()
.getItems()
.stream()
.map(NodeInstance::new)
.filter(addressFilter)
.collect(Collectors.toList());
this.attachRunsInfo(result);
public List<NodeInstance> filterNodes(final FilterNodesVO filterNodesVO, final MachineType machineType) {
switch (machineType) {
case KUBE:
return filterKubeNodes(filterNodesVO);
case CLOUD:
if (!authManager.isAdmin()) {
throw new AccessDeniedException(ACCESS_DENIED_MSG);
}
return filterCloudNodes(filterNodesVO);
case ALL:
if (!authManager.isAdmin()) {
log.debug("Cloud nodes is not available for non-admin users. Only kube nodes will be filtered.");
return filterKubeNodes(filterNodesVO);
}
final List<NodeInstance> kubeNodes = filterKubeNodes(filterNodesVO);
final List<NodeInstance> cloudNodes = filterCloudNodes(filterNodesVO);
return mergeNodesByMachineType(kubeNodes, cloudNodes);
default:
throw new UnsupportedOperationException(String.format("Unsupported type '%s'", machineType));
}
return result;
}

public NodeInstance getNode(String name) {
@@ -369,7 +362,7 @@ public List<NodeInstance> getNodes(final MachineType machineType) {
return getKubeNodes();
case CLOUD:
if (!authManager.isAdmin()) {
throw new AccessDeniedException("Cloud nodes is not available for non-admin users");
throw new AccessDeniedException(ACCESS_DENIED_MSG);
}
return getCloudNodes();
case ALL:
@@ -506,15 +499,20 @@ private List<NodeInstance> getKubeNodes() {
}

private List<NodeInstance> getCloudNodes() {
return getCloudNodes(null);
}

private List<NodeInstance> getCloudNodes(final FilterNodesVO filterNodesVO) {
return ListUtils.emptyIfNull(regionManager.loadAll()).stream()
.filter(AbstractCloudRegion::isClusterInclude)
.flatMap(region -> getCloudNodesInRegion(region).stream())
.flatMap(region -> getCloudNodesInRegion(region, filterNodesVO).stream())
.collect(Collectors.toList());
}

private List<NodeInstance> getCloudNodesInRegion(final AbstractCloudRegion region) {
private List<NodeInstance> getCloudNodesInRegion(final AbstractCloudRegion region,
final FilterNodesVO filterNodesVO) {
try {
return cloudFacade.getCloudNodes(region.getId());
return cloudFacade.getCloudNodes(region.getId(), filterNodesVO);
} catch (Exception e) {
log.error(e.getMessage(), e);
return Collections.emptyList();
@@ -563,4 +561,42 @@ private Optional<NodeInstance> findCloudNodeInRegion(final AbstractCloudRegion r
return Optional.empty();
}
}

private List<NodeInstance> filterCloudNodes(final FilterNodesVO filterNodesVO) {
if (StringUtils.isNotBlank(filterNodesVO.getRunId())) {
// filters by runid are not applicable for cloud nodes
return new ArrayList<>();
}
return StringUtils.isBlank(filterNodesVO.getAddress()) ? getCloudNodes() : getCloudNodes(filterNodesVO);
}

private List<NodeInstance> filterKubeNodes(final FilterNodesVO filterNodesVO) {
List<NodeInstance> result;
try (KubernetesClient client = kubernetesManager.getKubernetesClient()) {
Map<String, String> labelsMap = new HashedMap<>();
if (StringUtils.isNotBlank(filterNodesVO.getRunId())) {
labelsMap.put(KubernetesConstants.RUN_ID_LABEL, filterNodesVO.getRunId());
}
Predicate<NodeInstance> addressFilter = node -> true;
if (StringUtils.isNotBlank(filterNodesVO.getAddress())) {
Predicate<NodeInstanceAddress> addressEqualsPredicate = address ->
StringUtils.isNotBlank(address.getAddress()) &&
address.getAddress().equalsIgnoreCase(filterNodesVO.getAddress());
addressFilter = node ->
node.getAddresses() != null && node.getAddresses()
.stream()
.filter(addressEqualsPredicate).count() > 0;
}
result = client.nodes()
.withLabels(labelsMap)
.list()
.getItems()
.stream()
.map(NodeInstance::new)
.filter(addressFilter)
.collect(Collectors.toList());
this.attachRunsInfo(result);
}
return result;
}
}
Empty file.
Empty file.

0 comments on commit 3547a1e

Please sign in to comment.