Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-engine-flink-kubernetes] support additional dependencies #582

Merged
merged 14 commits into from
Jul 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ public enum FlinkVersionConfig {
FlinkImageType.SQL,
"1.16",
Arrays.asList("1.16.0", "1.16.1", "1.16.2"),
"ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.16");
"ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.16");

public static final FlinkVersionProperties FLINK_SQL_1_17 =
new FlinkVersionProperties(
FlinkImageType.SQL,
"1.17",
Arrays.asList("1.17.0", "1.17.1"),
"ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.17");
"ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.17");

public static final FlinkVersionProperties FLINK_SEATUNNEL_1_15 =
new FlinkVersionProperties(
FlinkImageType.SEATUNNEL,
"1.15",
Arrays.asList("1.16.0", "1.16.1", "1.16.2"),
Arrays.asList("1.15.0", "1.15.1", "1.15.2", "1.15.3", "1.15.4"),
"ghcr.io/flowerfine/scaleph-seatunnel:2.3.1-flink-1.15");

public static String findImage(FlinkImageType type, String flinkVersion) {
Expand All @@ -67,7 +67,7 @@ public static String findImage(FlinkImageType type, String flinkVersion) {
case SEATUNNEL:
return doFindImage(flinkVersion, FLINK_SEATUNNEL_1_15.getImage(), FLINK_SEATUNNEL_1_15);
default:
return null;
return FLINK_JAR_1_17.getImage();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum ResourceNames {
public static final String JAR_LOCAL_PATH = LOCAL_SCHEMA + SCALEPH_JAR_DIRECTORY;

public static final String FILE_FETCHER_FLINK_VOLUME_NAME = "file-fetcher-flink-volume";
public static final String LIB_DIRECTORY = "/flink/usrlib/";
public static final String LIB_DIRECTORY = "/scaleph/usrlib/";
public static final String LIB_LOCAL_PATH = LOCAL_SCHEMA + LIB_DIRECTORY;

public static final String SQL_DIRECTORY = "/scaleph/sql/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

package cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance;

import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.*;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.util.TemplateMerger;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkImageHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import org.apache.commons.lang3.EnumUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

@Component
Expand All @@ -39,13 +43,18 @@ public class FlinkDeploymentSpecHandler {
private FlinkStateStorageHandler flinkStateStorageHandler;
@Autowired
private FlinkJobServiceHandler flinkJobServiceHandler;
@Autowired
private FlinkImageHandler flinkImageHandler;

public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) {
FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec());
addArtifact(jobInstanceDTO, spec);
enableFileSystem(jobInstanceDTO, spec);
enableFlinkStateStore(jobInstanceDTO, spec);
addService(spec);
addImage(jobInstanceDTO, spec);

mergeJobInstance(jobInstanceDTO, spec);
return spec;
}

Expand All @@ -64,4 +73,22 @@ private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDT
private void addService(FlinkDeploymentSpec spec) {
flinkJobServiceHandler.handle(spec);
}

private void addImage(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
flinkImageHandler.handle(jobInstanceDTO, spec);
}

private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class));
spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class));
spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class));
JobSpec job = spec.getJob();
if (jobInstanceDTO.getParallelism() != null) {
job.setParallelism(jobInstanceDTO.getParallelism());
}
if (jobInstanceDTO.getUpgradeMode() != null) {
job.setUpgradeMode(EnumUtils.getEnum(UpgradeMode.class, jobInstanceDTO.getUpgradeMode().name()));
}
job.setAllowNonRestoredState(jobInstanceDTO.getAllowNonRestoredState());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.config.resource.ResourceNames;
import cn.sliew.scaleph.config.storage.S3FileSystemProperties;
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactJar;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkDeploymentJob;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO;
import cn.sliew.scaleph.resource.service.JarService;
import cn.sliew.scaleph.resource.service.dto.JarDTO;
import io.fabric8.kubernetes.api.model.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.*;

Expand All @@ -40,16 +45,25 @@ public class FileFetcherHandler {

private static final String ENDPOINT = "MINIO_ENDPOINT";

private static final String ENV_JAVA_OPTS_ALL_NAME = "CLASSPATH";

@Autowired(required = false)
private S3FileSystemProperties s3FileSystemProperties;
@Autowired
private JarService jarService;

public void handleJarArtifact(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) {
List<FileFetcherParam> files = collectFiles(jobDTO);
if (CollectionUtils.isEmpty(files)) {
return;
}

PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
handlePodTemplate(podBuilder);
spec.setPodTemplate(podBuilder.build());

JobManagerSpec jobManager = Optional.ofNullable(spec.getJobManager()).orElse(new JobManagerSpec());
handleJobManagerPodTemplate(jobDTO, jobManager);
handleJobManagerPodTemplate(jobDTO, jobManager, files);
spec.setJobManager(jobManager);
}

Expand All @@ -61,60 +75,76 @@ private void handlePodTemplate(PodBuilder builder) {

spec.addAllToVolumes(buildVolume()); // add volumes
ContainerUtil.findFlinkMainContainer(spec)
.addAllToEnv(buildJavaOptsEnv())
.addAllToVolumeMounts(buildVolumeMount()) // add volume mount
.endContainer();

spec.endSpec();
}

private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager) {
private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager, List<FileFetcherParam> files) {
PodBuilder builder = Optional.of(jobManager).map(JobManagerSpec::getPodTemplate).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder());
doHandle(jobDTO, builder);
doHandle(builder, files);
jobManager.setPodTemplate(builder.build());
}

private void doHandle(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private void doHandle(PodBuilder builder, List<FileFetcherParam> files) {
builder.editOrNewMetadata()
.withName(ResourceNames.JOB_MANAGER_POD_TEMPLATE_NAME)
.endMetadata();
addArtifactJar(jobDTO, builder);
addAdditionalJars(jobDTO, builder);
addFileFetcherInitContainers(builder, files);
}

private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private List<FileFetcherParam> collectFiles(WsFlinkKubernetesJobDTO jobDTO) {
List<FileFetcherParam> result = new ArrayList<>();
addArtifactJar(jobDTO, result);
addAdditionalJars(jobDTO, result);
return result;
}

private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, List<FileFetcherParam> result) {
if (jobDTO.getFlinkArtifactJar() == null) {
return;
}

switch (jobDTO.getDeploymentKind()) {
case FLINK_DEPLOYMENT:
doAddJars(jobDTO.getFlinkArtifactJar(), builder);
return;
result.add(new FileFetcherParam(jobDTO.getFlinkArtifactJar().getPath(), ResourceNames.SCALEPH_JAR_DIRECTORY + jobDTO.getFlinkArtifactJar().getFileName()));
break;
case FLINK_SESSION_JOB:
break;
default:
}
}

private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) {
private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, List<FileFetcherParam> result) {
switch (jobDTO.getDeploymentKind()) {
case FLINK_DEPLOYMENT:
addAdditionalJars();
return;
doAddAdditionalJars(jobDTO.getFlinkDeployment().getAdditionalDependencies(), result);
break;
case FLINK_SESSION_JOB:
break;
default:
}
}

private void doAddJars(WsFlinkArtifactJar jarArtifact, PodBuilder builder) {
builder.editOrNewSpec().addToInitContainers(addJarArtifact(jarArtifact)).endSpec();
private void doAddAdditionalJars(List<Long> additionalDependencies, List<FileFetcherParam> result) {
for (Long jarId : additionalDependencies) {
JarDTO jarDTO = jarService.getRaw(jarId);
result.add(new FileFetcherParam(jarDTO.getPath(), ResourceNames.LIB_DIRECTORY + jarDTO.getFileName()));
}
}

private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) {
private void addFileFetcherInitContainers(PodBuilder builder, List<FileFetcherParam> files) {
builder.editOrNewSpec().addToInitContainers(buildInitContainer(files)).endSpec();
}

private Container buildInitContainer(List<FileFetcherParam> files) {
ContainerBuilder builder = new ContainerBuilder();
builder.withName(ResourceNames.FILE_FETCHER_CONTAINER_NAME);
builder.withImage(ResourceNames.FILE_FETCHER_CONTAINER_IMAGE);
builder.withImagePullPolicy(ImagePullPolicy.IF_NOT_PRESENT.getValue());
builder.withArgs(buildFileFetcherArgs(jarArtifact));
builder.withArgs(buildFileFetcherArgs(files));
builder.withEnv(buildEnvs());
builder.withResources(buildResource());
builder.withVolumeMounts(buildVolumeMount());
Expand All @@ -123,13 +153,8 @@ private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) {
return builder.build();
}

private void addAdditionalJars() {

}

private List<String> buildFileFetcherArgs(WsFlinkArtifactJar jarArtifact) {
return Arrays.asList("-uri", jarArtifact.getPath(),
"-path", ResourceNames.SCALEPH_JAR_DIRECTORY + jarArtifact.getFileName());
private List<String> buildFileFetcherArgs(List<FileFetcherParam> files) {
return Arrays.asList("-file-fetcher-json", JacksonUtil.toJsonString(files));
}

private List<EnvVar> buildEnvs() {
Expand All @@ -152,6 +177,12 @@ private ResourceRequirements buildResource() {
return resourceRequirementsBuilder.build();
}

private List<EnvVar> buildJavaOptsEnv() {
EnvVarBuilder builder = new EnvVarBuilder();
builder.withName(ENV_JAVA_OPTS_ALL_NAME);
builder.withValue(ResourceNames.LIB_DIRECTORY);
return Collections.singletonList(builder.build());
}

private List<VolumeMount> buildVolumeMount() {
VolumeMountBuilder scalephLib = new VolumeMountBuilder();
Expand All @@ -175,4 +206,12 @@ private List<Volume> buildVolume() {
return Arrays.asList(scalephLib.build(), flinkLib.build());
}

@Getter
@Setter
@AllArgsConstructor
private static class FileFetcherParam {
private String uri;
private String path;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler;

import cn.sliew.scaleph.common.dict.flink.FlinkVersion;
import cn.sliew.scaleph.config.flink.FlinkImageType;
import cn.sliew.scaleph.config.flink.FlinkVersionConfig;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance.FlinkJobInstanceConverterFactory;
import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO;
import org.springframework.stereotype.Component;

@Component
public class FlinkImageHandler {

public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
FlinkVersion flinkVersion = FlinkJobInstanceConverterFactory.getFlinkVersion(jobInstanceDTO.getWsFlinkKubernetesJob());
String image;
switch (jobInstanceDTO.getWsFlinkKubernetesJob().getType()) {
case JAR:
image = FlinkVersionConfig.findImage(FlinkImageType.JAR, flinkVersion.getValue());
break;
case SQL:
image = FlinkVersionConfig.findImage(FlinkImageType.SQL, flinkVersion.getValue());
break;
case SEATUNNEL:
image = FlinkVersionConfig.findImage(FlinkImageType.SEATUNNEL, flinkVersion.getValue());
spec.setFlinkVersion(cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkVersion.v1_15);
break;
default:
image = FlinkVersionConfig.FLINK_JAR_1_17.getImage();
}
spec.setImage(image);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,12 @@ public interface FlinkKubernetesOperatorService {

void shutdownJob(Long clusterCredentialId, String job) throws Exception;

void restartJob(Long clusterCredentialId, String job) throws Exception;

void triggerSavepoint(Long clusterCredentialId, String job) throws Exception;

void suspendJob(Long clusterCredentialId, String job) throws Exception;

void resumeJob(Long clusterCredentialId, String job) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public interface WsFlinkKubernetesJobInstanceService {

Optional<GenericKubernetesResource> getStatusWithoutManagedFields(Long id);

Optional<GenericKubernetesResource> getJobWithoutStatus(Long id);

int updateStatus(Long id, FlinkDeploymentStatus status);

int clearStatus(Long id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,28 @@ public void shutdownJob(Long clusterCredentialId, String job) throws Exception {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
client.load(new ByteArrayInputStream((job).getBytes())).delete();
}

@Override
public void restartJob(Long clusterCredentialId, String job) throws Exception {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
client.load(new ByteArrayInputStream((job).getBytes())).delete();
}

@Override
public void triggerSavepoint(Long clusterCredentialId, String job) throws Exception {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
client.load(new ByteArrayInputStream((job).getBytes())).delete();
}

@Override
public void suspendJob(Long clusterCredentialId, String job) throws Exception {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
client.load(new ByteArrayInputStream((job).getBytes())).delete();
}

@Override
public void resumeJob(Long clusterCredentialId, String job) throws Exception {
KubernetesClient client = kubernetesService.getClient(clusterCredentialId);
client.load(new ByteArrayInputStream((job).getBytes())).delete();
}
}
Loading