Skip to content

Commit

Permalink
[AMORO-2979] Support K8S Pod Template From KubernetesOptimizer (#3072)
Browse files Browse the repository at this point in the history
[AMORO-2979] Added podTemplate support and test unit(#2979)
  • Loading branch information
wangyuxiang123 authored Sep 5, 2024
1 parent a52f5d4 commit 04fef53
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@

package org.apache.amoro.server.manager;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
Expand All @@ -34,12 +40,15 @@
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -54,7 +63,7 @@ public class KubernetesOptimizerContainer extends AbstractResourceContainer {
public static final String NAMESPACE = "namespace";
public static final String IMAGE = "image";
public static final String PULL_POLICY = "pullPolicy";

public static final String PODTEMPLATE = "podTemplate";
public static final String PULL_SECRETS = "imagePullSecrets";
public static final String KUBE_CONFIG_PATH = "kube-config-path";

Expand All @@ -79,9 +88,67 @@ protected Map<String, String> doScaleOut(Resource resource) {
groupProperties.putAll(getContainerProperties());
groupProperties.putAll(resource.getProperties());

// generate pod start args
long memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY));
long memory = memoryPerThread * resource.getThreadCount();
Map<String, Object> argsList = generatePodStartArgs(resource, groupProperties);
String image = argsList.get(IMAGE).toString();
String namespace = argsList.get(NAMESPACE).toString();
String pullPolicy = argsList.get(PULL_POLICY).toString();
List<LocalObjectReference> imagePullSecretsList =
(List<LocalObjectReference>) argsList.get(PULL_SECRETS);
int cpuLimit = (int) argsList.get("cpuLimit");
long memory = (long) argsList.get(MEMORY_PROPERTY);
String groupName = argsList.get("groupName").toString();
String resourceId = argsList.get("resourceId").toString();
String startUpArgs = argsList.get("startUpArgs").toString();

String kubernetesName = NAME_PREFIX + resourceId;
Deployment deployment;

if (null != groupProperties.get(PODTEMPLATE)) {
// configure the podTemplate read from config
PodTemplate podTemplate = initPodTemplateFromLocal(groupProperties);

deployment =
initPodTemplateFromFrontEnd(
podTemplate,
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);
} else {
deployment =
initPodTemplateWithoutConfig(
image,
pullPolicy,
cpuLimit,
groupName,
resourceId,
startUpArgs,
memory,
imagePullSecretsList);
}

client.apps().deployments().inNamespace(namespace).resource(deployment).create();
Map<String, String> startupProperties = Maps.newHashMap();
startupProperties.put(NAMESPACE, namespace);
startupProperties.put(KUBERNETES_NAME_PROPERTIES, kubernetesName);
return startupProperties;
}

public Map<String, Object> generatePodStartArgs(
Resource resource, Map<String, String> groupProperties) {
long memoryPerThread;
long memory;

if (resource.getMemoryMb() > 0) {
memory = resource.getMemoryMb();
} else {
memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY));
memory = memoryPerThread * resource.getThreadCount();
}
// point at amoro home in docker image
String startUpArgs =
String.format(
Expand All @@ -104,7 +171,31 @@ protected Map<String, String> doScaleOut(Resource resource) {

String resourceId = resource.getResourceId();
String groupName = resource.getGroupName();
String kubernetesName = NAME_PREFIX + resourceId;

Map<String, Object> argsList = Maps.newHashMap();
argsList.put(NAMESPACE, namespace);
argsList.put(IMAGE, image);
argsList.put(PULL_POLICY, pullPolicy);
argsList.put(PULL_SECRETS, imagePullSecretsList);
argsList.put(MEMORY_PROPERTY, memory);
argsList.put("cpuLimit", cpuLimit);
argsList.put("resourceId", resourceId);
argsList.put("groupName", groupName);
argsList.put("startUpArgs", startUpArgs);

return argsList;
}

public Deployment initPodTemplateWithoutConfig(
String image,
String pullPolicy,
int cpuLimit,
String groupName,
String resourceId,
String startUpArgs,
long memory,
List<LocalObjectReference> imagePullSecretsList) {

DeploymentBuilder deploymentBuilder =
new DeploymentBuilder()
.withNewMetadata()
Expand Down Expand Up @@ -156,13 +247,80 @@ protected Map<String, String> doScaleOut(Resource resource) {
.endTemplate()
.endSpec();
}
Deployment deployment = deploymentBuilder.build();

client.apps().deployments().inNamespace(namespace).resource(deployment).create();
Map<String, String> startupProperties = Maps.newHashMap();
startupProperties.put(NAMESPACE, namespace);
startupProperties.put(KUBERNETES_NAME_PROPERTIES, kubernetesName);
return startupProperties;
return deploymentBuilder.build();
}

public PodTemplate initPodTemplateFromLocal(Map<String, String> groupProperties) {
return new Yaml().loadAs(groupProperties.get(PODTEMPLATE), PodTemplate.class);
}

public Deployment initPodTemplateFromFrontEnd(
PodTemplate podTemplate,
String image,
String pullPolicy,
int cpuLimit,
String groupName,
String resourceId,
String startUpArgs,
long memory,
List<LocalObjectReference> imagePullSecretsList) {
podTemplate
.getTemplate()
.getMetadata()
.setLabels(
new HashMap<String, String>() {
{
put("app", NAME_PREFIX + resourceId);
put("AmoroOptimizerGroup", groupName);
put("AmoroResourceId", resourceId);
}
});

Container container = new Container();
container.setName("optimizer");
container.setImage(image);
container.setImagePullPolicy(pullPolicy);
container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", startUpArgs)));

ResourceRequirements resourceRequirements = new ResourceRequirements();
resourceRequirements.setLimits(
ImmutableMap.of(
"memory", new Quantity(memory + "Mi"),
"cpu", new Quantity(cpuLimit + "")));
resourceRequirements.setRequests(
ImmutableMap.of(
"memory", new Quantity(memory + "Mi"),
"cpu", new Quantity(cpuLimit + "")));
container.setResources(resourceRequirements);

podTemplate.getTemplate().getSpec().getContainers().set(0, container);

if (!imagePullSecretsList.isEmpty()) {
podTemplate.getTemplate().getSpec().setImagePullSecrets(imagePullSecretsList);
}

DeploymentSpec deploymentSpec = new DeploymentSpec();
deploymentSpec.setTemplate(podTemplate.getTemplate());

LabelSelector labelSelector = new LabelSelector();
labelSelector.setMatchLabels(
new HashMap<String, String>() {
{
put("app", NAME_PREFIX + resourceId);
}
});

deploymentSpec.setSelector(labelSelector);
deploymentSpec.setReplicas(1);

Deployment deployment = new Deployment();
deployment.setSpec(deploymentSpec);
ObjectMeta deploymentMetadata = new ObjectMeta();
deploymentMetadata.setName(NAME_PREFIX + resourceId);
deployment.setMetadata(deploymentMetadata);

return deployment;
}

@Override
Expand Down
Loading

0 comments on commit 04fef53

Please sign in to comment.