Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-ui-react] update flink kubernetes job detail web #698

Merged
merged 6 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private Info apiInfo() {
return new Info()
.title("Scaleph API文档")
.description("Scaleph API文档")
.version("1.0.5-SNAPSHOT")
.version("2.0.3-SNAPSHOT")
.termsOfService("https://flowerfine.github.io/scaleph-website/zh")
.license(new License().name("Apache 2.0").url("https://github.com/flowerfine/scaleph/blob/dev/LICENSE"))
.contact(contact());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import javax.validation.Valid;
import java.net.URI;
import java.util.List;
import java.util.Optional;

@Tag(name = "Flink Kubernetes管理-Job管理")
@RestController
Expand Down Expand Up @@ -171,6 +172,17 @@ public ResponseEntity<Page<WsFlinkKubernetesJobInstanceDTO>> listInstances(@Vali
return new ResponseEntity<>(result, HttpStatus.OK);
}

@Logging
@GetMapping("instances/current")
@Operation(summary = "获取任务当前实例", description = "获取任务当前实例")
public ResponseEntity<ResponseVO<WsFlinkKubernetesJobInstanceDTO>> currentInstance(@RequestParam("wsFlinkKubernetesJobId") Long wsFlinkKubernetesJobId) throws Exception {
Optional<WsFlinkKubernetesJobInstanceDTO> optional = wsFlinkKubernetesJobInstanceService.selectCurrent(wsFlinkKubernetesJobId);
if (optional.isPresent()) {
return new ResponseEntity<>(ResponseVO.success(optional.get()), HttpStatus.OK);
}
return new ResponseEntity<>(ResponseVO.error("not found"), HttpStatus.NOT_FOUND);
}

@Logging
@GetMapping("/instances/asYaml/{id}")
@Operation(summary = "查询 YAML 格式 Job 实例", description = "查询 YAML 格式 Job 实例")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public class WsFlinkKubernetesJobInstance extends BaseDO {
@TableField("user_flink_configuration")
private String userFlinkConfiguration;

@TableField("merged_flink_configuration")
private String mergedFlinkConfiguration;

@TableField(value = "`state`", updateStrategy = FieldStrategy.IGNORED)
private ResourceLifecycleState state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="merged_flink_configuration" property="mergedFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
Expand All @@ -58,6 +59,7 @@
<result column="job_manager" property="jobManager"/>
<result column="task_manager" property="taskManager"/>
<result column="user_flink_configuration" property="userFlinkConfiguration"/>
<result column="merged_flink_configuration" property="mergedFlinkConfiguration"/>
<result column="state" property="state"/>
<result column="job_state" property="jobState"/>
<result column="error" property="error"/>
Expand All @@ -75,7 +77,7 @@
id, creator, create_time, editor, update_time,
ws_flink_kubernetes_job_id, instance_id,
parallelism, upgrade_mode, allow_non_restored_state,
job_manager, task_manager, user_flink_configuration,
job_manager, task_manager, user_flink_configuration, merged_flink_configuration,
`state`, job_state, `error`, cluster_info, task_manager_info,
start_time, end_time, duration
</sql>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

@Component
Expand Down Expand Up @@ -99,7 +98,7 @@ private void addLogging(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDep
private void mergeJobInstance(WsFlinkKubernetesJobInstanceDTO jobInstanceDTO, FlinkDeploymentSpec spec) {
spec.setJobManager(TemplateMerger.merge(spec.getJobManager(), jobInstanceDTO.getJobManager(), JobManagerSpec.class));
spec.setTaskManager(TemplateMerger.merge(spec.getTaskManager(), jobInstanceDTO.getTaskManager(), TaskManagerSpec.class));
spec.setFlinkConfiguration(TemplateMerger.merge(spec.getFlinkConfiguration(), jobInstanceDTO.getUserFlinkConfiguration(), Map.class));
spec.setFlinkConfiguration(jobInstanceDTO.getMergedFlinkConfiguration());
JobSpec job = spec.getJob();
if (jobInstanceDTO.getParallelism() != null) {
job.setParallelism(jobInstanceDTO.getParallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ default WsFlinkKubernetesJobInstance toDo(WsFlinkKubernetesJobInstanceDTO dto) {
if (CollectionUtils.isEmpty(dto.getUserFlinkConfiguration()) == false) {
entity.setUserFlinkConfiguration(JacksonUtil.toJsonString(dto.getUserFlinkConfiguration()));
}
if (CollectionUtils.isEmpty(dto.getMergedFlinkConfiguration()) == false) {
entity.setMergedFlinkConfiguration(JacksonUtil.toJsonString(dto.getMergedFlinkConfiguration()));
}
if (CollectionUtils.isEmpty(dto.getClusterInfo()) == false) {
entity.setClusterInfo(JacksonUtil.toJsonString(dto.getClusterInfo()));
}
Expand All @@ -79,6 +82,9 @@ default WsFlinkKubernetesJobInstanceDTO toDto(WsFlinkKubernetesJobInstance entit
if (StringUtils.hasText(entity.getUserFlinkConfiguration())) {
dto.setUserFlinkConfiguration(JacksonUtil.parseJsonString(entity.getUserFlinkConfiguration(), Map.class));
}
if (StringUtils.hasText(entity.getMergedFlinkConfiguration())) {
dto.setMergedFlinkConfiguration(JacksonUtil.parseJsonString(entity.getMergedFlinkConfiguration(), Map.class));
}
if (StringUtils.hasText(entity.getClusterInfo())) {
dto.setClusterInfo(JacksonUtil.parseJsonString(entity.getClusterInfo(), Map.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class WsFlinkKubernetesJobInstanceDTO extends BaseDO {
@Schema(description = "user flink configuration")
private Map<String, String> userFlinkConfiguration;

@Schema(description = "merged flink configuration")
private Map<String, String> mergedFlinkConfiguration;

@Schema(description = "deploy state")
private ResourceLifecycleState state;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper;
import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.util.TemplateMerger;
import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceDeployParam;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec;
import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobState;
Expand Down Expand Up @@ -161,16 +162,16 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti
if (param.getUserFlinkConfiguration() != null) {
record.setUserFlinkConfiguration(JacksonUtil.toJsonString(param.getUserFlinkConfiguration()));
}
wsFlinkKubernetesJobInstanceMapper.insert(record);
WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId());
WsFlinkKubernetesJobDTO jobDTO = jobInstanceDTO.getWsFlinkKubernetesJob();
String yaml = asYaml(record.getId());
WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(param.getWsFlinkKubernetesJobId());
Long clusterCredentialId = null;
String resource = null;
WatchCallbackHandler callbackHandler = null;
switch (jobDTO.getDeploymentKind()) {
case FLINK_DEPLOYMENT:
clusterCredentialId = jobDTO.getFlinkDeployment().getClusterCredentialId();
Map<String, String> flinkConfiguration = jobDTO.getFlinkDeployment().getFlinkConfiguration();
Map<String, String> mergedFlinkConfiguration = TemplateMerger.merge(flinkConfiguration, param.getUserFlinkConfiguration(), Map.class);
record.setMergedFlinkConfiguration(JacksonUtil.toJsonString(mergedFlinkConfiguration));
resource = Constant.FLINK_DEPLOYMENT;
callbackHandler = flinkDeploymentWatchCallbackHandler;
break;
Expand All @@ -181,6 +182,10 @@ public void deploy(WsFlinkKubernetesJobInstanceDeployParam param) throws Excepti
break;
default:
}
wsFlinkKubernetesJobInstanceMapper.insert(record);

WsFlinkKubernetesJobInstanceDTO jobInstanceDTO = selectOne(record.getId());
String yaml = asYaml(record.getId());
flinkKubernetesOperatorService.deployJob(clusterCredentialId, yaml);
// add watch
Map<String, String> lables = metadataHandler.generateLables(jobInstanceDTO);
Expand Down
27 changes: 17 additions & 10 deletions scaleph-ui-react/config/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,7 @@ export default [
{
path: '/workspace/engine/compute/flink/deployment/detail',
component: './Project/Workspace/Engine/Compute/Flink/Deployment/Detail',
},
{
name: 'job',
path: '/workspace/engine/compute/flink/job',
component: './Project/Workspace/Engine/Compute/Flink/Job',
},
{
path: '/workspace/engine/compute/flink/job/detail',
component: './Project/Workspace/Engine/Compute/Flink/Job/Detail',
},
}
]
}
]
Expand Down Expand Up @@ -283,6 +274,22 @@ export default [
},
]
},
{
name: 'project.operation',
path: '/workspace/operation',
icon: 'solution',
routes: [
{
name: 'flink',
path: '/workspace/operation/compute/flink/job',
component: './Project/Workspace/Engine/Compute/Flink/Job',
},
{
path: '/workspace/operation/compute/flink/job/detail',
component: './Project/Workspace/Engine/Compute/Flink/Job/Detail',
}
]
},
]
},
{
Expand Down
3 changes: 2 additions & 1 deletion scaleph-ui-react/src/locales/zh-CN/menu.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export default {
'menu.project.engine.compute.flink.template': '部署模板',
'menu.project.engine.compute.flink.session-cluster': 'Session 集群',
'menu.project.engine.compute.flink.deployment': 'Deployment',
'menu.project.engine.compute.flink.job': '任务',
'menu.project.data-integration': '数据集成',
'menu.project.data-integration.seatunnel': 'SeaTunnel',
'menu.project.data-integration.flink-cdc': 'Flink CDC',
Expand All @@ -27,6 +26,8 @@ export default {
'menu.project.dag-scheduler': 'DAG 调度',
'menu.project.data-service': '数据服务',
'menu.project.data-service.config': '接口配置',
'menu.project.operation': '运维中心',
'menu.project.operation.flink': 'Flink任务',

'menu.resource': '资源',
'menu.resource.jar': '公共 Jar',
Expand Down
17 changes: 11 additions & 6 deletions scaleph-ui-react/src/locales/zh-CN/pages/project.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1014,17 +1014,22 @@ export default {
'pages.project.flink.kubernetes.job.detail.metrics': 'Metrics',
'pages.project.flink.kubernetes.job.detail.logs': 'Logs',

'pages.project.flink.kubernetes.job.detail.overview': '总览',
'pages.project.flink.kubernetes.job.detail.overview.artifact': 'Artifact',
'pages.project.flink.kubernetes.job.detail.overview.resource': '资源详情',
'pages.project.flink.kubernetes.job.detail.overview.configuration': 'Flink 配置',
'pages.project.flink.kubernetes.job.detail.savepoint': '状态管理',
'pages.project.flink.kubernetes.job.detail.savepoint.timeStamp': 'TimeStamp',
'pages.project.flink.kubernetes.job.detail.savepoint.formatType': 'Format',
'pages.project.flink.kubernetes.job.detail.savepoint.triggerType': 'Trigger Type',
'pages.project.flink.kubernetes.job.detail.savepoint.location': 'Location',
'pages.project.flink.kubernetes.job.detail.yaml': 'YAML',
'pages.project.flink.kubernetes.job.detail.instanceList': 'Intances',
'pages.project.flink.kubernetes.job.detail.instanceList': '历史实例',
'pages.project.flink.kubernetes.job.detail.instanceList.startTime': 'Start Time',
'pages.project.flink.kubernetes.job.detail.instanceList.endTime': 'End Time',
'pages.project.flink.kubernetes.job.detail.instanceList.duration': 'Duration',
'pages.project.flink.kubernetes.job.detail.instanceList.upgradeMode': 'Upgrade Mode',
'pages.project.flink.kubernetes.job.detail.savepoint': 'Savepoint',
'pages.project.flink.kubernetes.job.detail.savepoint.timeStamp': 'TimeStamp',
'pages.project.flink.kubernetes.job.detail.savepoint.formatType': 'Format',
'pages.project.flink.kubernetes.job.detail.savepoint.triggerType': 'Trigger Type',
'pages.project.flink.kubernetes.job.detail.savepoint.location': 'Location',


'pages.project.doris.template': 'Template',
'pages.project.doris.template.name': '名称',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import React, {useRef} from "react";
import {ActionType, ProCard, ProDescriptions, ProFormInstance} from "@ant-design/pro-components";
import {connect, useAccess, useIntl} from "@umijs/max";
import {Props} from '@/typings';
import {WsArtifactFlinkJar, WsFlinkKubernetesJob} from "@/services/project/typings";
import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions";
import {ProCoreActionType} from "@ant-design/pro-utils/es/typing";
import {Button, message, Popconfirm} from "antd";
import {
AreaChartOutlined,
CameraOutlined,
CaretRightOutlined,
CloseOutlined,
DashboardOutlined, OrderedListOutlined,
RedoOutlined
} from "@ant-design/icons";
import {WsFlinkKubernetesJobService} from "@/services/project/WsFlinkKubernetesJobService";

const ArtifactFlinkJarWeb: React.FC<Props<WsFlinkKubernetesJob>> = (props: any) => {
const intl = useIntl();
const access = useAccess();
const actionRef = useRef<ActionType>();
const formRef = useRef<ProFormInstance>();


const descriptionColumns: ProDescriptionsItemProps<WsArtifactFlinkJar>[] = [
{
title: intl.formatMessage({id: 'pages.project.artifact.name'}),
key: `name`,
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.artifact?.name
}
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.fileName'}),
key: `fileName`,
dataIndex: 'fileName'
},
{
title: intl.formatMessage({id: 'pages.resource.flinkRelease.version'}),
key: `flinkVersion`,
dataIndex: 'flinkVersion',
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.flinkVersion?.label
}
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.entryClass'}),
key: `entryClass`,
dataIndex: 'entryClass'
},
{
title: intl.formatMessage({id: 'pages.project.artifact.jar.jarParams'}),
key: `jarParams`,
dataIndex: 'jarParams',
valueType: 'jsonCode'
},
{
title: intl.formatMessage({id: 'app.common.data.remark'}),
key: `remark`,
dataIndex: 'remark',
renderText: (text: any, record: WsArtifactFlinkJar, index: number, action: ProCoreActionType) => {
return record.artifact?.remark
}
},
{
title: intl.formatMessage({id: 'app.common.data.createTime'}),
key: `createTime`,
dataIndex: 'createTime',
},
{
title: intl.formatMessage({id: 'app.common.data.updateTime'}),
key: `updateTime`,
dataIndex: 'updateTime',
}
]

return (
<ProDescriptions
column={2}
bordered={true}
dataSource={props.flinkKubernetesJobDetail.job?.artifactFlinkJar}
columns={descriptionColumns}
/>
);
}


const mapModelToProps = ({flinkKubernetesJobDetail}: any) => ({flinkKubernetesJobDetail})
export default connect(mapModelToProps)(ArtifactFlinkJarWeb);
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import React, {useEffect, useState} from "react";
import {ProColumns, ProTable} from "@ant-design/pro-components";
import {connect, useAccess, useIntl} from "@umijs/max";

type Config = {
key: string;
value?: any;
};

const FlinkKubernetesJobDetailConfigurationWeb: React.FC = (props: any) => {
const intl = useIntl();
const access = useAccess();
const [dataSource, setDataSource] = useState<Config[]>([])

useEffect(() => {
if (props.flinkKubernetesJobDetail.job?.jobInstance) {
const config: Array<Config> = []
Object.entries<[string, any][]>(props.flinkKubernetesJobDetail.job?.jobInstance?.mergedFlinkConfiguration ? {...props.flinkKubernetesJobDetail.job?.jobInstance?.mergedFlinkConfiguration} : {}).forEach(([key, value]) => {
config.push({
key: key,
value: value
})
});
setDataSource(config)
}
}, [props.flinkKubernetesJobDetail.job]);

const tableColumns: ProColumns<Config>[] = [
{
dataIndex: 'key',
width: '40%'
},
{
dataIndex: 'value',
},
]

return (
<ProTable<Config>
rowKey="key"
columns={tableColumns}
dataSource={dataSource}
bordered
options={false}
search={false}
showHeader={false}
/>
);
}

const mapModelToProps = ({flinkKubernetesJobDetail}: any) => ({flinkKubernetesJobDetail})
export default connect(mapModelToProps)(FlinkKubernetesJobDetailConfigurationWeb);
Loading
Loading