From f28afe8f23d81171bd6052666bea5b978f7f7d49 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sat, 15 Jul 2023 08:53:02 +0800 Subject: [PATCH 1/7] feature: update flink kubernetes job instance service --- ...FlinkKubernetesJobInstanceServiceImpl.java | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 1ad98ce8b..75b53eb7e 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -22,9 +22,10 @@ import cn.sliew.scaleph.common.util.UUIDUtil; import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkDeploymentJobConverter; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkSessionJobConverter; import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkKubernetesOperatorService; import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService; -import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobService; import cn.sliew.scaleph.engine.flink.kubernetes.service.convert.WsFlinkKubernetesJobInstanceConvert; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; @@ -48,9 +49,9 @@ public class WsFlinkKubernetesJobInstanceServiceImpl implements WsFlinkKubernete @Autowired private WsFlinkKubernetesJobInstanceMapper wsFlinkKubernetesJobInstanceMapper; @Autowired - private WsFlinkKubernetesJobService wsFlinkKubernetesJobService; - @Autowired private FlinkKubernetesOperatorService flinkKubernetesOperatorService; + @Autowired + private FlinkDeploymentJobConverter flinkDeploymentJobConverter; @Override public Page list(WsFlinkKubernetesJobInstanceListParam param) { @@ -74,12 +75,30 @@ public WsFlinkKubernetesJobInstanceDTO selectOne(Long id) { @Override public WsFlinkKubernetesJobInstanceDTO selectCurrent(Long wsFlinkKubernetesJobId) { + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkKubernetesJobInstance.class) + .eq(WsFlinkKubernetesJobInstance::getWsFlinkKubernetesJobId, wsFlinkKubernetesJobId) + .orderByDesc(WsFlinkKubernetesJobInstance::getId) + .last("limit 1"); + + WsFlinkKubernetesJobInstance record = wsFlinkKubernetesJobInstanceMapper.selectOne(queryWrapper); + if (record != null) { + return WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDto(record); + } return null; } @Override public Object asYaml(Long id) throws Exception { - return null; + WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(id); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); + switch (jobDTO.getDeploymentKind()) { + case FLINK_DEPLOYMENT: + return flinkDeploymentJobConverter.convertTo(jobDTO); + case FLINK_SESSION_JOB: + return FlinkSessionJobConverter.INSTANCE.convertTo(jobDTO); + default: + throw new RuntimeException("unsupport flink deployment mode for " + jobDTO.getDeploymentKind()); + } } @Override @@ -97,7 +116,8 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti record.setUserFlinkConfiguration(JacksonUtil.toJsonString(param.getUserFlinkConfiguration())); } wsFlinkKubernetesJobInstanceMapper.insert(record); - WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(param.getWsFlinkKubernetesJobId()); + WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId()); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); Object yaml = asYaml(record.getId()); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: @@ -113,7 +133,7 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti @Override public void shutdown(WsFlinkKubernetesJobInstanceShutdownParam param) throws Exception { WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(param.getId()); - WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(jobInstanceDTO.getWsFlinkKubernetesJobId()); + WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob(); Object yaml = asYaml(param.getId()); switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: From 4d69d6531b4090eb52be0587bd0567b4fcabffa4 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Tue, 25 Jul 2023 17:22:35 +0800 Subject: [PATCH 2/7] feature: update flink image --- .../config/flink/FlinkVersionConfig.java | 8 +-- .../instance/FlinkDeploymentSpecHandler.java | 8 +++ .../resource/handler/FlinkImageHandler.java | 51 +++++++++++++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java index 5284cd5f5..607ace436 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/flink/FlinkVersionConfig.java @@ -42,20 +42,20 @@ public enum FlinkVersionConfig { FlinkImageType.SQL, "1.16", Arrays.asList("1.16.0", "1.16.1", "1.16.2"), - "ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.16"); + "ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.16"); public static final FlinkVersionProperties FLINK_SQL_1_17 = new FlinkVersionProperties( FlinkImageType.SQL, "1.17", Arrays.asList("1.17.0", "1.17.1"), - "ghcr.io/flowerfine/scaleph/scaleph-sql-templates:1.17"); + "ghcr.io/flowerfine/scaleph/scaleph-sql-template:1.17"); public static final FlinkVersionProperties FLINK_SEATUNNEL_1_15 = new FlinkVersionProperties( FlinkImageType.SEATUNNEL, "1.15", - Arrays.asList("1.16.0", "1.16.1", "1.16.2"), + Arrays.asList("1.15.0", "1.15.1", "1.15.2", "1.15.3", "1.15.4"), "ghcr.io/flowerfine/scaleph-seatunnel:2.3.1-flink-1.15"); public static String findImage(FlinkImageType type, String flinkVersion) { @@ -67,7 +67,7 @@ public static String findImage(FlinkImageType type, String flinkVersion) { case SEATUNNEL: return doFindImage(flinkVersion, FLINK_SEATUNNEL_1_15.getImage(), FLINK_SEATUNNEL_1_15); default: - return null; + return FLINK_JAR_1_17.getImage(); } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index c5768413f..ea3dc346d 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -20,6 +20,7 @@ import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkImageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; @@ -39,6 +40,8 @@ public class FlinkDeploymentSpecHandler { private FlinkStateStorageHandler flinkStateStorageHandler; @Autowired private FlinkJobServiceHandler flinkJobServiceHandler; + @Autowired + private FlinkImageHandler flinkImageHandler; public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec flinkDeploymentSpec) { FlinkDeploymentSpec spec = Optional.ofNullable(flinkDeploymentSpec).orElse(new FlinkDeploymentSpec()); @@ -46,6 +49,7 @@ public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO enableFileSystem(jobInstanceDTO, spec); enableFlinkStateStore(jobInstanceDTO, spec); addService(spec); + addImage(jobInstanceDTO, spec); return spec; } @@ -64,4 +68,8 @@ private void enableFlinkStateStore(WsFlinkKubernetesJobInstanceDTO jobInstanceDT private void addService(FlinkDeploymentSpec spec) { flinkJobServiceHandler.handle(spec); } + + private void addImage(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + flinkImageHandler.handle(jobInstanceDTO, spec); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java new file mode 100644 index 000000000..f62f5af45 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FlinkImageHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; + +import cn.sliew.scaleph.common.dict.flink.FlinkVersion; +import cn.sliew.scaleph.config.flink.FlinkImageType; +import cn.sliew.scaleph.config.flink.FlinkVersionConfig; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance.FlinkJobInstanceConverterFactory; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import org.springframework.stereotype.Component; + +@Component +public class FlinkImageHandler { + + public void handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + FlinkVersion flinkVersion = FlinkJobInstanceConverterFactory.getFlinkVersion(jobInstanceDTO.getWsFlinkKubernetesJob()); + String image; + switch (jobInstanceDTO.getWsFlinkKubernetesJob().getType()) { + case JAR: + image = FlinkVersionConfig.findImage(FlinkImageType.JAR, flinkVersion.getValue()); + break; + case SQL: + image = FlinkVersionConfig.findImage(FlinkImageType.SQL, flinkVersion.getValue()); + break; + case SEATUNNEL: + image = FlinkVersionConfig.findImage(FlinkImageType.SEATUNNEL, flinkVersion.getValue()); + spec.setFlinkVersion(cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkVersion.v1_15); + break; + default: + image = FlinkVersionConfig.FLINK_JAR_1_17.getImage(); + } + spec.setImage(image); + } +} From 943a09dfc2547fe318d1d63ee2a8bba6586218b2 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Tue, 25 Jul 2023 18:06:34 +0800 Subject: [PATCH 3/7] feature: add restart and triggersavepoint support --- .../FlinkKubernetesOperatorService.java | 8 +++++++ .../WsFlinkKubernetesJobInstanceService.java | 2 ++ .../FlinkKubernetesOperatorServiceImpl.java | 24 +++++++++++++++++++ ...FlinkKubernetesJobInstanceServiceImpl.java | 12 ++++++++++ 4 files changed, 46 insertions(+) diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java index 57c710326..111a11ddc 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/FlinkKubernetesOperatorService.java @@ -39,4 +39,12 @@ public interface FlinkKubernetesOperatorService { void shutdownJob(Long clusterCredentialId, String job) throws Exception; + void restartJob(Long clusterCredentialId, String job) throws Exception; + + void triggerSavepoint(Long clusterCredentialId, String job) throws Exception; + + void suspendJob(Long clusterCredentialId, String job) throws Exception; + + void resumeJob(Long clusterCredentialId, String job) throws Exception; + } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java index f3a45834a..a7a8cdd9f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java @@ -48,6 +48,8 @@ public interface WsFlinkKubernetesJobInstanceService { Optional getStatusWithoutManagedFields(Long id); + Optional getJobWithoutStatus(Long id); + int updateStatus(Long id, FlinkDeploymentStatus status); int clearStatus(Long id); diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java index a9333dd94..e981868ad 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/FlinkKubernetesOperatorServiceImpl.java @@ -102,4 +102,28 @@ public void shutdownJob(Long clusterCredentialId, String job) throws Exception { KubernetesClient client = kubernetesService.getClient(clusterCredentialId); client.load(new ByteArrayInputStream((job).getBytes())).delete(); } + + @Override + public void restartJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void triggerSavepoint(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void suspendJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } + + @Override + public void resumeJob(Long clusterCredentialId, String job) throws Exception { + KubernetesClient client = kubernetesService.getClient(clusterCredentialId); + client.load(new ByteArrayInputStream((job).getBytes())).delete(); + } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index 89668d1a2..84c83ba27 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -180,6 +180,18 @@ public Optional getStatusWithoutManagedFields(Long id return Optional.of(builder.build()); } + @Override + public Optional getJobWithoutStatus(Long id) { + Optional optional = getStatusWithoutManagedFields(id); + if (optional.isEmpty()) { + return Optional.empty(); + } + GenericKubernetesResource status = optional.get(); + GenericKubernetesResourceBuilder builder = new GenericKubernetesResourceBuilder(status); + builder.removeFromAdditionalProperties("status"); + return Optional.of(builder.build()); + } + @Override public int updateStatus(Long id, FlinkDeploymentStatus status) { if (status == null) { From 0ce9072ec595860df69b89a5f9f0106299fa6ef3 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Fri, 28 Jul 2023 11:51:49 +0800 Subject: [PATCH 4/7] feature: merge job instance configuration --- .../instance/FlinkDeploymentSpecHandler.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index ea3dc346d..6c4222ee2 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -18,15 +18,18 @@ package cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance; -import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.*; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.util.TemplateMerger; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FileSystemPluginHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkImageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkJobServiceHandler; import cn.sliew.scaleph.engine.flink.kubernetes.resource.handler.FlinkStateStorageHandler; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import org.apache.commons.lang3.EnumUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.Map; import java.util.Optional; @Component @@ -50,6 +53,8 @@ public FlinkDeploymentSpec handle(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO enableFlinkStateStore(jobInstanceDTO, spec); addService(spec); addImage(jobInstanceDTO, spec); + + mergeJobInstance(jobInstanceDTO, spec); return spec; } @@ -72,4 +77,14 @@ private void addService(FlinkDeploymentSpec spec) { private void addImage(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { flinkImageHandler.handle(jobInstanceDTO, spec); } + + private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) { + spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class)); + spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class)); + spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class)); + JobSpec job = spec.getJob(); + job.setParallelism(jobInstanceDTO.getParallelism()); + job.setUpgradeMode(EnumUtils.getEnum(UpgradeMode.class, jobInstanceDTO.getUpgradeMode().name())); + job.setAllowNonRestoredState(jobInstanceDTO.getAllowNonRestoredState()); + } } From 27917424f7a8ba941d6a1d233f404f79d600a578 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sun, 30 Jul 2023 16:39:42 +0800 Subject: [PATCH 5/7] feature: add additional dependencies --- .../config/resource/ResourceNames.java | 2 +- .../instance/FlinkDeploymentSpecHandler.java | 8 +- .../resource/handler/FileFetcherHandler.java | 80 +++++++++++++------ .../file/fetcher/cli/FetchOptions.java | 23 ++++-- .../file/fetcher/cli/OptionsParser.java | 15 +++- .../fetcher/executor/FetcherExecutor.java | 21 +++-- .../Workspace/Kubernetes/Job/Detail/index.tsx | 2 +- 7 files changed, 102 insertions(+), 49 deletions(-) diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java index e6cae6e68..c732de5a6 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java @@ -30,7 +30,7 @@ public enum ResourceNames { public static final String JOB_MANAGER_POD_TEMPLATE_NAME = "task-manager-pod-template"; public static final String TASK_MANAGER_POD_TEMPLATE_NAME = "task-manager-pod-template"; - public static final String FILE_FETCHER_CONTAINER_IMAGE = "ghcr.io/flowerfine/scaleph/scaleph-file-fetcher:latest"; + public static final String FILE_FETCHER_CONTAINER_IMAGE = "scaleph-file-fetcher:dev"; public static final String SQL_TEMPLATE_IMAGE = "ghcr.io/flowerfine/scaleph-sql-template:1.17"; public static final String SCALEPH_SEATUNNEL_IMAGE = "ghcr.io/flowerfine/scaleph-seatunnel:2.3.1-flink-1.15"; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java index 6c4222ee2..d62dbb395 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/definition/job/instance/FlinkDeploymentSpecHandler.java @@ -83,8 +83,12 @@ private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, Fl spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class)); spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class)); JobSpec job = spec.getJob(); - job.setParallelism(jobInstanceDTO.getParallelism()); - job.setUpgradeMode(EnumUtils.getEnum(UpgradeMode.class, jobInstanceDTO.getUpgradeMode().name())); + if (jobInstanceDTO.getParallelism() != null) { + job.setParallelism(jobInstanceDTO.getParallelism()); + } + if (jobInstanceDTO.getUpgradeMode() != null) { + job.setUpgradeMode(EnumUtils.getEnum(UpgradeMode.class, jobInstanceDTO.getUpgradeMode().name())); + } job.setAllowNonRestoredState(jobInstanceDTO.getAllowNonRestoredState()); } } diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java index 884a813d1..71ea1eaf4 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java @@ -18,17 +18,22 @@ package cn.sliew.scaleph.engine.flink.kubernetes.resource.handler; +import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.config.resource.ResourceNames; import cn.sliew.scaleph.config.storage.S3FileSystemProperties; -import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkArtifactJar; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobManagerSpec; -import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.FlinkDeploymentJob; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; +import cn.sliew.scaleph.resource.service.JarService; +import cn.sliew.scaleph.resource.service.dto.JarDTO; import io.fabric8.kubernetes.api.model.*; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import java.util.*; @@ -42,14 +47,21 @@ public class FileFetcherHandler { @Autowired(required = false) private S3FileSystemProperties s3FileSystemProperties; + @Autowired + private JarService jarService; public void handleJarArtifact(WsFlinkKubernetesJobDTO jobDTO, FlinkDeploymentSpec spec) { + List files = collectFiles(jobDTO); + if (CollectionUtils.isEmpty(files)) { + return; + } + PodBuilder podBuilder = Optional.ofNullable(spec.getPodTemplate()).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); handlePodTemplate(podBuilder); spec.setPodTemplate(podBuilder.build()); JobManagerSpec jobManager = Optional.ofNullable(spec.getJobManager()).orElse(new JobManagerSpec()); - handleJobManagerPodTemplate(jobDTO, jobManager); + handleJobManagerPodTemplate(jobDTO, jobManager, files); spec.setJobManager(jobManager); } @@ -67,54 +79,69 @@ private void handlePodTemplate(PodBuilder builder) { spec.endSpec(); } - private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager) { + private void handleJobManagerPodTemplate(WsFlinkKubernetesJobDTO jobDTO, JobManagerSpec jobManager, List files) { PodBuilder builder = Optional.of(jobManager).map(JobManagerSpec::getPodTemplate).map(pod -> new PodBuilder(pod)).orElse(new PodBuilder()); - doHandle(jobDTO, builder); + doHandle(builder, files); jobManager.setPodTemplate(builder.build()); } - private void doHandle(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void doHandle(PodBuilder builder, List files) { builder.editOrNewMetadata() .withName(ResourceNames.JOB_MANAGER_POD_TEMPLATE_NAME) .endMetadata(); - addArtifactJar(jobDTO, builder); - addAdditionalJars(jobDTO, builder); + addFileFetcherInitContainers(builder, files); } - private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private List collectFiles(WsFlinkKubernetesJobDTO jobDTO) { + List result = new ArrayList<>(); + addArtifactJar(jobDTO, result); + addAdditionalJars(jobDTO, result); + return result; + } + + private void addArtifactJar(WsFlinkKubernetesJobDTO jobDTO, List result) { if (jobDTO.getFlinkArtifactJar() == null) { return; } switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - doAddJars(jobDTO.getFlinkArtifactJar(), builder); - return; + result.add(new FileFetcherParam(jobDTO.getFlinkArtifactJar().getPath(), ResourceNames.SCALEPH_JAR_DIRECTORY + jobDTO.getFlinkArtifactJar().getFileName())); + break; case FLINK_SESSION_JOB: + break; default: } } - private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, PodBuilder builder) { + private void addAdditionalJars(WsFlinkKubernetesJobDTO jobDTO, List result) { switch (jobDTO.getDeploymentKind()) { case FLINK_DEPLOYMENT: - addAdditionalJars(); - return; + doAddAdditionalJars(jobDTO.getFlinkDeployment().getAdditionalDependencies(), result); + break; case FLINK_SESSION_JOB: + break; default: } } - private void doAddJars(WsFlinkArtifactJar jarArtifact, PodBuilder builder) { - builder.editOrNewSpec().addToInitContainers(addJarArtifact(jarArtifact)).endSpec(); + private void doAddAdditionalJars(List additionalDependencies, List result) { + for (Long jarId : additionalDependencies) { + JarDTO jarDTO = jarService.getRaw(jarId); + result.add(new FileFetcherParam(jarDTO.getPath(), ResourceNames.LIB_DIRECTORY + jarDTO.getFileName())); + } } - private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) { + private void addFileFetcherInitContainers(PodBuilder builder, List files) { + builder.editOrNewSpec().addToInitContainers(buildInitContainer(files)).endSpec(); + } + + private Container buildInitContainer(List files) { ContainerBuilder builder = new ContainerBuilder(); builder.withName(ResourceNames.FILE_FETCHER_CONTAINER_NAME); builder.withImage(ResourceNames.FILE_FETCHER_CONTAINER_IMAGE); builder.withImagePullPolicy(ImagePullPolicy.IF_NOT_PRESENT.getValue()); - builder.withArgs(buildFileFetcherArgs(jarArtifact)); + builder.withArgs(buildFileFetcherArgs(files)); builder.withEnv(buildEnvs()); builder.withResources(buildResource()); builder.withVolumeMounts(buildVolumeMount()); @@ -123,13 +150,8 @@ private Container addJarArtifact(WsFlinkArtifactJar jarArtifact) { return builder.build(); } - private void addAdditionalJars() { - - } - - private List buildFileFetcherArgs(WsFlinkArtifactJar jarArtifact) { - return Arrays.asList("-uri", jarArtifact.getPath(), - "-path", ResourceNames.SCALEPH_JAR_DIRECTORY + jarArtifact.getFileName()); + private List buildFileFetcherArgs(List files) { + return Arrays.asList("-file-fetcher-json", JacksonUtil.toJsonString(files)); } private List buildEnvs() { @@ -175,4 +197,12 @@ private List buildVolume() { return Arrays.asList(scalephLib.build(), flinkLib.build()); } + @Getter + @Setter + @AllArgsConstructor + private static class FileFetcherParam { + private String uri; + private String path; + } + } diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java index bc43906e2..78ecd522b 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/FetchOptions.java @@ -18,27 +18,34 @@ package cn.sliew.scaleph.file.fetcher.cli; +import cn.sliew.milky.common.util.JacksonUtil; import lombok.Getter; +import lombok.Setter; import org.apache.commons.cli.CommandLine; -import java.net.URI; -import java.net.URISyntaxException; +import java.util.List; import java.util.Properties; -import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.*; +import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.DYNAMIC_PROPERTIES; +import static cn.sliew.scaleph.file.fetcher.cli.OptionsParser.FILE_FETCHER_JSON; @Getter public class FetchOptions extends CommandLineOptions { - private final URI uri; - private final String path; + private final List params; private final Properties properties; - public FetchOptions(CommandLine line) throws URISyntaxException { + public FetchOptions(CommandLine line) { super(line); - this.uri = new URI(line.getOptionValue(URI_OPTION)); - this.path = line.getOptionValue(PATH_OPTION); + this.params = JacksonUtil.parseJsonArray(line.getOptionValue(FILE_FETCHER_JSON), FileFetcherParam.class); this.properties = line.getOptionProperties(DYNAMIC_PROPERTIES); } + @Getter + @Setter + public static class FileFetcherParam { + private String uri; + private String path; + } + } diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java index 22078f0e1..3af80aef0 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/cli/OptionsParser.java @@ -34,6 +34,15 @@ public class OptionsParser { .desc("Show the help message for the CLI Frontend or the action.") .build(); + public static final Option FILE_FETCHER_JSON = + Option.builder() + .longOpt("file-fetcher-json") + .required(true) + .hasArg(true) + .argName("[{\"uri\": \"...\", \"path\": \"...\"}]") + .desc("file fetcher json.") + .build(); + public static final Option URI_OPTION = Option.builder() .option("uri") @@ -60,8 +69,7 @@ public class OptionsParser { private static Options buildGeneralOptions(Options options) { options.addOption(HELP_OPTION); - options.addOption(URI_OPTION); - options.addOption(PATH_OPTION); + options.addOption(FILE_FETCHER_JSON); options.addOption(DYNAMIC_PROPERTIES); return options; } @@ -93,8 +101,7 @@ public static void printHelpForInfo() { public static CommandLine parse(String[] args, boolean stopAtNonOptions) throws CliArgsException { Options options = new Options() - .addOption(URI_OPTION) - .addOption(PATH_OPTION) + .addOption(FILE_FETCHER_JSON) .addOption(DYNAMIC_PROPERTIES); DefaultParser parser = new DefaultParser(); diff --git a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java index 135e01912..a0fa656aa 100644 --- a/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java +++ b/scaleph-file-fetcher/src/main/java/cn/sliew/scaleph/file/fetcher/executor/FetcherExecutor.java @@ -30,8 +30,11 @@ import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; +import java.io.IOException; +import java.net.URI; import java.util.Arrays; import java.util.Optional; +import java.util.Properties; @Slf4j @Component @@ -42,14 +45,16 @@ public class FetcherExecutor implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - try { - log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs()))); - CommandLine line = OptionsParser.parse(args.getSourceArgs(), true); - FetchOptions options = new FetchOptions(line); - Optional fileFetcher = fileFetcherFactory.find(options.getUri(), options.getProperties()); - fileFetcher.orElseThrow().fetch(options.getUri(), options.getPath()); - } catch (Exception e) { - log.error("下载文件异常! 参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs())), e); + log.info("命令行参数: {}", JacksonUtil.toJsonString(Arrays.asList(args.getSourceArgs()))); + CommandLine line = OptionsParser.parse(args.getSourceArgs(), true); + FetchOptions options = new FetchOptions(line); + for (FetchOptions.FileFetcherParam param : options.getParams()) { + doFetch(new URI(param.getUri()), param.getPath(), options.getProperties()); } } + + private void doFetch(URI uri, String path, Properties properties) throws IOException { + Optional fileFetcher = fileFetcherFactory.find(uri, properties); + fileFetcher.orElseThrow().fetch(uri, path); + } } diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index a9a89320c..751e74a54 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -98,7 +98,7 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { render: () => [
, From 1afc7e395345f49539e62277e3d8bfbb29ed8393 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sun, 30 Jul 2023 17:19:01 +0800 Subject: [PATCH 6/7] feature: add additional dependencies CLASSPATH env --- .../cn/sliew/scaleph/config/resource/ResourceNames.java | 2 +- .../kubernetes/resource/handler/FileFetcherHandler.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java index c732de5a6..3e473cab1 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java @@ -39,7 +39,7 @@ public enum ResourceNames { public static final String JAR_LOCAL_PATH = LOCAL_SCHEMA + SCALEPH_JAR_DIRECTORY; public static final String FILE_FETCHER_FLINK_VOLUME_NAME = "file-fetcher-flink-volume"; - public static final String LIB_DIRECTORY = "/flink/usrlib/"; + public static final String LIB_DIRECTORY = "/scaleph/usrlib/"; public static final String LIB_LOCAL_PATH = LOCAL_SCHEMA + LIB_DIRECTORY; public static final String SQL_DIRECTORY = "/scaleph/sql/"; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java index 71ea1eaf4..1fc869a44 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/resource/handler/FileFetcherHandler.java @@ -45,6 +45,8 @@ public class FileFetcherHandler { private static final String ENDPOINT = "MINIO_ENDPOINT"; + private static final String ENV_JAVA_OPTS_ALL_NAME = "CLASSPATH"; + @Autowired(required = false) private S3FileSystemProperties s3FileSystemProperties; @Autowired @@ -73,6 +75,7 @@ private void handlePodTemplate(PodBuilder builder) { spec.addAllToVolumes(buildVolume()); // add volumes ContainerUtil.findFlinkMainContainer(spec) + .addAllToEnv(buildJavaOptsEnv()) .addAllToVolumeMounts(buildVolumeMount()) // add volume mount .endContainer(); @@ -174,6 +177,12 @@ private ResourceRequirements buildResource() { return resourceRequirementsBuilder.build(); } + private List buildJavaOptsEnv() { + EnvVarBuilder builder = new EnvVarBuilder(); + builder.withName(ENV_JAVA_OPTS_ALL_NAME); + builder.withValue(ResourceNames.LIB_DIRECTORY); + return Collections.singletonList(builder.build()); + } private List buildVolumeMount() { VolumeMountBuilder scalephLib = new VolumeMountBuilder(); From 9e3df1226f443c6eadddffac8098098ef95ae6d3 Mon Sep 17 00:00:00 2001 From: wangqi <1942460489@qq.com> Date: Sun, 30 Jul 2023 17:20:06 +0800 Subject: [PATCH 7/7] feature: add additional dependencies CLASSPATH env --- .../java/cn/sliew/scaleph/config/resource/ResourceNames.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java index 3e473cab1..e0bfd0094 100644 --- a/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java +++ b/scaleph-config/src/main/java/cn/sliew/scaleph/config/resource/ResourceNames.java @@ -30,7 +30,7 @@ public enum ResourceNames { public static final String JOB_MANAGER_POD_TEMPLATE_NAME = "task-manager-pod-template"; public static final String TASK_MANAGER_POD_TEMPLATE_NAME = "task-manager-pod-template"; - public static final String FILE_FETCHER_CONTAINER_IMAGE = "scaleph-file-fetcher:dev"; + public static final String FILE_FETCHER_CONTAINER_IMAGE = "ghcr.io/flowerfine/scaleph/scaleph-file-fetcher:latest"; public static final String SQL_TEMPLATE_IMAGE = "ghcr.io/flowerfine/scaleph-sql-template:1.17"; public static final String SCALEPH_SEATUNNEL_IMAGE = "ghcr.io/flowerfine/scaleph-seatunnel:2.3.1-flink-1.15";