diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java index 5284cd5f5..607ace436 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java @@ -42,20 +42,20 @@ public enum FlinkVersionConfig { FlinkImageType.SQL, "1.16", Arrays.asList("1.16.0", "1.16.1", "1.16.2"), - "ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.16"); + "ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.16"); public static final FlinkVersionProperties FLINK_SQL_1_17 = new FlinkVersionProperties( FlinkImageType.SQL, "1.17", Arrays.asList("1.17.0", "1.17.1"), - "ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.17"); + "ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.17"); public static final FlinkVersionProperties FLINK_SEATUNNEL_1_15 = new FlinkVersionProperties( FlinkImageType.SEATUNNEL, "1.15", - Arrays.asList("1.16.0", "1.16.1", "1.16.2"), + Arrays.asList("1.15.0", "1.15.1", "1.15.2", "1.15.3", "1.15.4"), "ghcr.io/flowerfine/scaleph-seatunnel:2.3.1-flink-1.15"); public static String findImage(FlinkImageType type, String flinkVersion) { @@ -67,7 +67,7 @@ public static String findImage(FlinkImageType type, String flinkVersion) { case SEATUNNEL: return doFindImage(flinkVersion, FLINK_SEATUNNEL_1_15.getImage(), FLINK_SEATUNNEL_1_15); default: - return null; + return FLINK_JAR_1_17.getImage(); } } diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java index e6cae6e68..e0bfd0094 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java @@ -39,7 +39,7 @@ public enum ResourceNames { public static final String JAR_LOCAL_PATH = LOCAL_SCHEMA + SCALEPH_JAR_DIRECTORY; public static final String FILE_FETCHER_FLINK_VOLUME_NAME = "file-fetcher-flink-volume"; - public static final String LIB_DIRECTORY = "/flink/usrlib/"; + public static final String LIB_DIRECTORY = "/scaleph/usrlib/"; public static final String LIB_LOCAL_PATH = LOCAL_SCHEMA + LIB_DIRECTORY; public static final String SQL_DIRECTORY = "/scaleph/sql/"; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index c5768413f..d62dbb395 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -18,14 +18,18 @@ package cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance; -import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.*; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.util.TemplateMerger; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkImageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import org.apache.commons.lang3.EnumUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Map; import java.util.Optional; @Component @@ -39,6 +43,8 @@ public class FlinkDeploymentSpecHandler { private FlinkStateStorageHandler flinkStateStorageHandler; @Autowired private FlinkJobServiceHandler flinkJobServiceHandler; + @Autowired + private FlinkImageHandler flinkImageHandler; public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) { FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec()); @@ -46,6 +52,9 @@ public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO enableFileSystem(jobInstanceDTO, spec); enableFlinkStateStore(jobInstanceDTO, spec); addService(spec); + addImage(jobInstanceDTO, spec); + + mergeJobInstance(jobInstanceDTO, spec); return spec; } @@ -64,4 +73,22 @@ private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDT private void addService(FlinkDeploymentSpec spec) { flinkJobServiceHandler.handle(spec); } + + private void addImage(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + flinkImageHandler.handle(jobInstanceDTO, spec); + } + + private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class)); + spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class)); + spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class)); + JobSpec job = spec.getJob(); + if (jobInstanceDTO.getParallelism() != null) { + job.setParallelism(jobInstanceDTO.getParallelism()); + } + if (jobInstanceDTO.getUpgradeMode() != null) { + job.setUpgradeMode(EnumUtils.getEnum(UpgradeMode.class, jobInstanceDTO.getUpgradeMode().name())); + } + job.setAllowNonRestoredState(jobInstanceDTO.getAllowNonRestoredState()); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java index 884a813d1..1fc869a44 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java @@ -18,17 +18,22 @@ package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.config.resource.ResourceNames; import cn.sliew.scaleph.config.storage.S3FileSystemProperties; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactJar; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; -import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkDeploymentJob; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; +import cn.sliew.scaleph.resource.service.JarService; +import cn.sliew.scaleph.resource.service.dto.JarDTO; import io.fabric8.kubernetes.api.model.*; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.*; @@ -40,16 +45,25 @@ public class FileFetcherHandler { private static final String ENDPOINT = "MINIO_ENDPOINT"; + private static final String ENV_JAVA_OPTS_ALL_NAME = "CLASSPATH"; + @Autowired(required = false) private S3FileSystemProperties s3FileSystemProperties; + @Autowired + private JarService jarService; public void handleJarArtifact(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { + List files = collectFiles(jobDTO); + if (CollectionUtils.isEmpty(files)) { + return; + } + PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); handlePodTemplate(podBuilder); spec.setPodTemplate(podBuilder.build()); JobManagerSpec jobManager = Optional.ofNullable(spec.getJobManager()).orElse(new JobManagerSpec()); - handleJobManagerPodTemplate(jobDTO, jobManager); + handleJobManagerPodTemplate(jobDTO, jobManager, files); spec.setJobManager(jobManager); } @@ -61,60 +75,76 @@ private void handlePodTemplate(PodBuilder builder) { spec.addAllToVolumes(buildVolume()); // add volumes ContainerUtil.findFlinkMainContainer(spec) + .addAllToEnv(buildJavaOptsEnv()) .addAllToVolumeMounts(buildVolumeMount()) // add volume mount .endContainer(); spec.endSpec(); } - private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager) { + private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager, List files) { PodBuilder builder = Optional.of(jobManager).map(JobManagerSpec::getPodTemplate).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); - doHandle(jobDTO, builder); + doHandle(builder, files); jobManager.setPodTemplate(builder.build()); } - private void doHandle(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void doHandle(PodBuilder builder, List files) { builder.editOrNewMetadata() .withName(ResourceNames.JOB_MANAGER_POD_TEMPLATE_NAME) .endMetadata(); - addArtifactJar(jobDTO, builder); - addAdditionalJars(jobDTO, builder); + addFileFetcherInitContainers(builder, files); } - private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private List collectFiles(WsFlinkKubernetesJobDTO jobDTO) { + List result = new ArrayList<>(); + addArtifactJar(jobDTO, result); + addAdditionalJars(jobDTO, result); + return result; + } + + private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, List result) { if (jobDTO.getFlinkArtifactJar() == null) { return; } switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - doAddJars(jobDTO.getFlinkArtifactJar(), builder); - return; + result.add(new FileFetcherParam(jobDTO.getFlinkArtifactJar().getPath(), ResourceNames.SCALEPH_JAR_DIRECTORY + jobDTO.getFlinkArtifactJar().getFileName())); + break; case FLINK_SESSION_JOB: + break; default: } } - private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, List result) { switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - addAdditionalJars(); - return; + doAddAdditionalJars(jobDTO.getFlinkDeployment().getAdditionalDependencies(), result); + break; case FLINK_SESSION_JOB: + break; default: } } - private void doAddJars(WsFlinkArtifactJar jarArtifact, PodBuilder builder) { - builder.editOrNewSpec().addToInitContainers(addJarArtifact(jarArtifact)).endSpec(); + private void doAddAdditionalJars(List additionalDependencies, List result) { + for (Long jarId : additionalDependencies) { + JarDTO jarDTO = jarService.getRaw(jarId); + result.add(new FileFetcherParam(jarDTO.getPath(), ResourceNames.LIB_DIRECTORY + jarDTO.getFileName())); + } } - private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) { + private void addFileFetcherInitContainers(PodBuilder builder, List files) { + builder.editOrNewSpec().addToInitContainers(buildInitContainer(files)).endSpec(); + } + + private Container buildInitContainer(List files) { ContainerBuilder builder = new ContainerBuilder(); builder.withName(ResourceNames.FILE_FETCHER_CONTAINER_NAME); builder.withImage(ResourceNames.FILE_FETCHER_CONTAINER_IMAGE); builder.withImagePullPolicy(ImagePullPolicy.IF_NOT_PRESENT.getValue()); - builder.withArgs(buildFileFetcherArgs(jarArtifact)); + builder.withArgs(buildFileFetcherArgs(files)); builder.withEnv(buildEnvs()); builder.withResources(buildResource()); builder.withVolumeMounts(buildVolumeMount()); @@ -123,13 +153,8 @@ private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) { return builder.build(); } - private void addAdditionalJars() { - - } - - private List buildFileFetcherArgs(WsFlinkArtifactJar jarArtifact) { - return Arrays.asList("-uri", jarArtifact.getPath(), - "-path", ResourceNames.SCALEPH_JAR_DIRECTORY + jarArtifact.getFileName()); + private List buildFileFetcherArgs(List files) { + return Arrays.asList("-file-fetcher-json", JacksonUtil.toJsonString(files)); } private List buildEnvs() { @@ -152,6 +177,12 @@ private ResourceRequirements buildResource() { return resourceRequirementsBuilder.build(); } + private List buildJavaOptsEnv() { + EnvVarBuilder builder = new EnvVarBuilder(); + builder.withName(ENV_JAVA_OPTS_ALL_NAME); + builder.withValue(ResourceNames.LIB_DIRECTORY); + return Collections.singletonList(builder.build()); + } private List buildVolumeMount() { VolumeMountBuilder scalephLib = new VolumeMountBuilder(); @@ -175,4 +206,12 @@ private List buildVolume() { return Arrays.asList(scalephLib.build(), flinkLib.build()); } + @Getter + @Setter + @AllArgsConstructor + private static class FileFetcherParam { + private String uri; + private String path; + } + } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java new file mode 100644 index 000000000..f62f5af45 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.config.flink.FlinkImageType; +import cn.sliew.scaleph.config.flink.FlinkVersionConfig; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance.FlinkJobInstanceConverterFactory; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import org.springframework.stereotype.Component; + +@Component +public class FlinkImageHandler { + + public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + FlinkVersion flinkVersion = FlinkJobInstanceConverterFactory.getFlinkVersion(jobInstanceDTO.getWsFlinkKubernetesJob()); + String image; + switch (jobInstanceDTO.getWsFlinkKubernetesJob().getType()) { + case JAR: + image = FlinkVersionConfig.findImage(FlinkImageType.JAR, flinkVersion.getValue()); + break; + case SQL: + image = FlinkVersionConfig.findImage(FlinkImageType.SQL, flinkVersion.getValue()); + break; + case SEATUNNEL: + image = FlinkVersionConfig.findImage(FlinkImageType.SEATUNNEL, flinkVersion.getValue()); + spec.setFlinkVersion(cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkVersion.v1_15); + break; + default: + image = FlinkVersionConfig.FLINK_JAR_1_17.getImage(); + } + spec.setImage(image); + } +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java index 57c710326..111a11ddc 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java @@ -39,4 +39,12 @@ public interface FlinkKubernetesOperatorService { void shutdownJob(Long clusterCredentialId, String job) throws Exception; + void restartJob(Long clusterCredentialId, String job) throws Exception; + + void triggerSavepoint(Long clusterCredentialId, String job) throws Exception; + + void suspendJob(Long clusterCredentialId, String job) throws Exception; + + void resumeJob(Long clusterCredentialId, String job) throws Exception; + } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java index f3a45834a..a7a8cdd9f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java @@ -48,6 +48,8 @@ public interface WsFlinkKubernetesJobInstanceService { Optional getStatusWithoutManagedFields(Long id); + Optional getJobWithoutStatus(Long id); + int updateStatus(Long id, FlinkDeploymentStatus status); int clearStatus(Long id); diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java index a9333dd94..e981868ad 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java @@ -102,4 +102,28 @@ public void shutdownJob(Long clusterCredentialId, String job) throws Exception { KubernetesClient client = kubernetesService.getClient(clusterCredentialId); client.load(new ByteArrayInputStream((job).getBytes())).delete(); } + + @Override + public void restartJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void triggerSavepoint(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void suspendJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void resumeJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 89668d1a2..84c83ba27 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -180,6 +180,18 @@ public Optional getStatusWithoutManagedFields(Long id return Optional.of(builder.build()); } + @Override + public Optional getJobWithoutStatus(Long id) { + Optional optional = getStatusWithoutManagedFields(id); + if (optional.isEmpty()) { + return Optional.empty(); + } + GenericKubernetesResource status = optional.get(); + GenericKubernetesResourceBuilder builder = new GenericKubernetesResourceBuilder(status); + builder.removeFromAdditionalProperties("status"); + return Optional.of(builder.build()); + } + @Override public int updateStatus(Long id, FlinkDeploymentStatus status) { if (status == null) { diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java index bc43906e2..78ecd522b 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java @@ -18,27 +18,34 @@ package cn.sliew.scaleph.file.fetcher.cli; +import cn.sliew.milky.common.util.JacksonUtil; import lombok.Getter; +import lombok.Setter; import org.apache.commons.cli.CommandLine; -import java.net.URI; -import java.net.URISyntaxException; +import java.util.List; import java.util.Properties; -import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.*; +import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.DYNAMIC_PROPERTIES; +import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.FILE_FETCHER_JSON; @Getter public class FetchOptions extends CommandLineOptions { - private final URI uri; - private final String path; + private final List params; private final Properties properties; - public FetchOptions(CommandLine line) throws URISyntaxException { + public FetchOptions(CommandLine line) { super(line); - this.uri = new URI(line.getOptionValue(URI_OPTION)); - this.path = line.getOptionValue(PATH_OPTION); + this.params = JacksonUtil.parseJsonArray(line.getOptionValue(FILE_FETCHER_JSON), FileFetcherParam.class); this.properties = line.getOptionProperties(DYNAMIC_PROPERTIES); } + @Getter + @Setter + public static class FileFetcherParam { + private String uri; + private String path; + } + } diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java index 22078f0e1..3af80aef0 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java @@ -34,6 +34,15 @@ public class OptionsParser { .desc("Show the help message for the CLI Frontend or the action.") .build(); + public static final Option FILE_FETCHER_JSON = + Option.builder() + .longOpt("file-fetcher-json") + .required(true) + .hasArg(true) + .argName("[{\"uri\": \"...\", \"path\": \"...\"}]") + .desc("file fetcher json.") + .build(); + public static final Option URI_OPTION = Option.builder() .option("uri") @@ -60,8 +69,7 @@ public class OptionsParser { private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); - options.addOption(URI_OPTION); - options.addOption(PATH_OPTION); + options.addOption(FILE_FETCHER_JSON); options.addOption(DYNAMIC_PROPERTIES); return options; } @@ -93,8 +101,7 @@ public static void printHelpForInfo() { public static CommandLine parse(String[] args, boolean stopAtNonOptions) throws CliArgsException { Options options = new Options() - .addOption(URI_OPTION) - .addOption(PATH_OPTION) + .addOption(FILE_FETCHER_JSON) .addOption(DYNAMIC_PROPERTIES); DefaultParser parser = new DefaultParser(); diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java index 135e01912..a0fa656aa 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java @@ -30,8 +30,11 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Optional; +import java.util.Properties; @Slf4j @Component @@ -42,14 +45,16 @@ public class FetcherExecutor implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - try { - log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs()))); - CommandLine line = OptionsParser.parse(args.getSourceArgs(), true); - FetchOptions options = new FetchOptions(line); - Optional fileFetcher = fileFetcherFactory.find(options.getUri(), options.getProperties()); - fileFetcher.orElseThrow().fetch(options.getUri(), options.getPath()); - } catch (Exception e) { - log.error("下载文件异常! 参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs())), e); + log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs()))); + CommandLine line = OptionsParser.parse(args.getSourceArgs(), true); + FetchOptions options = new FetchOptions(line); + for (FetchOptions.FileFetcherParam param : options.getParams()) { + doFetch(new URI(param.getUri()), param.getPath(), options.getProperties()); } } + + private void doFetch(URI uri, String path, Properties properties) throws IOException { + Optional fileFetcher = fileFetcherFactory.find(uri, properties); + fileFetcher.orElseThrow().fetch(uri, path); + } } diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index a9a89320c..751e74a54 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -98,7 +98,7 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { render: () => [
,