diff --git a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java index 9f003fbfc..e92899069 100644 --- a/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java +++ b/scaleph-api/src/main/java/cn/sliew/scaleph/api/controller/ws/WsFlinkSqlGatewayController.java @@ -23,7 +23,10 @@ import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryResultDTO; import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo; import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService; +import cn.sliew.scaleph.system.model.PaginationParam; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.Parameters; import io.swagger.v3.oas.annotations.tags.Tag; import org.apache.flink.table.gateway.api.results.GatewayInfo; import org.apache.flink.table.gateway.api.results.ResultSet; @@ -32,7 +35,6 @@ import org.springframework.web.bind.annotation.*; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; @@ -50,50 +52,67 @@ public WsFlinkSqlGatewayController(WsFlinkSqlGatewayService wsFlinkSqlGatewaySer @GetMapping("{clusterId}/info") @Operation(summary = "获取SqlGateway信息", description = "获取SqlGateway信息") + @Parameters(@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId")) public ResponseEntity getClusterInfo(@PathVariable("clusterId") String clusterId) { return ResponseEntity.ok(wsFlinkSqlGatewayService.getGatewayInfo(clusterId)); } - @RequestMapping(value = "{clusterId}/openSession", method = {RequestMethod.PUT, RequestMethod.POST}) + @PutMapping("{clusterId}/openSession") @Operation(summary = "新建Session", description = "新建Session") + @Parameters(@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId")) public ResponseEntity openSession(@PathVariable("clusterId") String clusterId) { return ResponseEntity.ok(wsFlinkSqlGatewayService.openSession(clusterId)); } @DeleteMapping("{clusterId}/{sessionHandleId}") @Operation(summary = "关闭Session", description = "关闭Session") - public ResponseEntity closeSession( - @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId) { + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果") + }) + public ResponseEntity closeSession(@PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId) { return ResponseEntity.ok(wsFlinkSqlGatewayService.closeSession(clusterId, sessionHandleId)); } @GetMapping("{clusterId}/{sessionHandleId}/getCatalogInfo") @Operation(summary = "获取所有Catalog信息", description = "获取所有Catalog信息") - public ResponseEntity> getCatalogInfo( - @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId, - @RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) { + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果"), + @Parameter(name = "includeSystemFunctions", description = "是否返回系统函数") + }) + public ResponseEntity> getCatalogInfo(@PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) { return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId, includeSystemFunctions)); } - @RequestMapping(value = "{clusterId}/{sessionHandleId}/executeSql", method = {RequestMethod.POST, RequestMethod.PUT}) + @PostMapping("{clusterId}/{sessionHandleId}/executeSql") @Operation(summary = "执行FlinkSql", description = "执行FlinkSql") + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果") + }) public ResponseEntity executeSql(@PathVariable("clusterId") String clusterId, @PathVariable("sessionHandleId") String sessionHandleId, @RequestBody WsFlinkSqlGatewayQueryParamsDTO params) { return ResponseEntity.ok(wsFlinkSqlGatewayService.executeSql(clusterId, sessionHandleId, params)); } - @GetMapping("{clusterId}/{sessionHandleId}/{operationHandleId}/results/{token}") + @GetMapping("{clusterId}/{sessionHandleId}/{operationHandleId}/results") @Operation(summary = "获取Sql执行结果", description = "获取Sql执行结果") + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果"), + @Parameter(name = "operationHandleId", description = "executeSql() 返回结果") + }) public ResponseEntity fetchResults(@PathVariable("clusterId") String clusterId, @PathVariable("sessionHandleId") String sessionHandleId, @PathVariable("operationHandleId") String operationHandleId, - @PathVariable(value = "token", required = false) Long token, - @RequestParam(value = "maxRows", required = false, defaultValue = "100") int maxRows) { - ResultSet resultSet = wsFlinkSqlGatewayService.fetchResults(clusterId, sessionHandleId, operationHandleId, token, maxRows); + PaginationParam param) { + ResultSet resultSet = wsFlinkSqlGatewayService.fetchResults(clusterId, sessionHandleId, operationHandleId, param.getCurrent(), param.getPageSize().intValue()); try { WsFlinkSqlGatewayQueryResultDTO wsFlinkSqlGatewayQueryResultDTO = WsFlinkSqlGatewayQueryResultDTO.fromResultSet(resultSet); return ResponseEntity.ok(wsFlinkSqlGatewayQueryResultDTO); @@ -104,6 +123,11 @@ public ResponseEntity fetchResults(@PathVariabl @DeleteMapping("{clusterId}/{sessionHandleId}/{operationHandleId}") @Operation(summary = "取消执行的任务", description = "取消执行的任务") + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果"), + @Parameter(name = "operationHandleId", description = "executeSql() 返回结果") + }) public ResponseEntity cancel(@PathVariable("clusterId") String clusterId, @PathVariable("sessionHandleId") String sessionHandleId, @PathVariable("operationHandleId") String operationHandleId) { @@ -112,11 +136,16 @@ public ResponseEntity cancel(@PathVariable("clusterId") String clusterI @GetMapping("{clusterId}/{sessionHandleId}/completeStatement") @Operation(summary = "获取Sql提示", description = "获取Sql提示") - public ResponseEntity> completeStatement( - @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId, - @RequestParam("statement") String statement, - @RequestParam("position") int position) { + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果"), + @Parameter(name = "statement", description = "sql 片段"), + @Parameter(name = "position", description = "position") + }) + public ResponseEntity> completeStatement(@PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestParam("statement") String statement, + @RequestParam("position") int position) { try { return ResponseEntity.ok(wsFlinkSqlGatewayService.completeStatement(clusterId, sessionHandleId, statement, position)); } catch (Exception e) { @@ -126,21 +155,26 @@ public ResponseEntity> completeStatement( @PostMapping("{clusterId}/{sessionHandleId}/addDependencies") @Operation(summary = "添加依赖jar包", description = "添加依赖jar包") - public ResponseEntity addDependencies( - @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId, - @RequestParam("jarIdList") List jarIdList - ) { + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果"), + @Parameter(name = "jarIdList", description = "jar ids") + }) + public ResponseEntity addDependencies(@PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestParam("jarIdList") List jarIdList) { return ResponseEntity.ok(wsFlinkSqlGatewayService.addDependencies(clusterId, sessionHandleId, jarIdList)); } @PostMapping("{clusterId}/{sessionHandleId}/addCatalog") @Operation(summary = "添加catalog", description = "添加catalog") - public ResponseEntity addCatalog( - @PathVariable("clusterId") String clusterId, - @PathVariable("sessionHandleId") String sessionHandleId, - @RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params - ) { + @Parameters({ + @Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"), + @Parameter(name = "sessionHandleId", description = "openSession() 返回结果") + }) + public ResponseEntity addCatalog(@PathVariable("clusterId") String clusterId, + @PathVariable("sessionHandleId") String sessionHandleId, + @RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params) { return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, sessionHandleId, params.getCatalogName(), params.getOptions())); } diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java index 58e9aa44f..bdf80bef1 100644 --- a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/DictType.java @@ -25,10 +25,7 @@ import cn.sliew.scaleph.common.dict.common.*; import cn.sliew.scaleph.common.dict.ds.RedisMode; import cn.sliew.scaleph.common.dict.flink.*; -import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind; -import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode; -import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; -import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode; +import cn.sliew.scaleph.common.dict.flink.kubernetes.*; import cn.sliew.scaleph.common.dict.image.ImagePullPolicy; import cn.sliew.scaleph.common.dict.job.*; import cn.sliew.scaleph.common.dict.seatunnel.*; @@ -89,6 +86,8 @@ public enum DictType implements DictDefinition { FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class), FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class), FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class), + FLINK_KUBERNETES_SAVEPOINT_FORMAT_TYPE("savepoint_format_type", "Savepoint Format 类型", SavepointFormatType.class), + FLINK_KUBERNETES_SAVEPOINT_TRIGGER_TYPE("savepoint_trigger_type", "Savepoint Trigger 类型", SavepointTriggerType.class), FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class), FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class), diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointFormatType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointFormatType.java new file mode 100644 index 000000000..24d1a7d74 --- /dev/null +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointFormatType.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.common.dict.flink.kubernetes; + +import cn.sliew.scaleph.common.dict.DictInstance; +import com.baomidou.mybatisplus.annotation.EnumValue; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Arrays; + +@JsonFormat(shape = JsonFormat.Shape.OBJECT) +public enum SavepointFormatType implements DictInstance { + + CANONICAL("CANONICAL", "CANONICAL", "A canonical, common for all state backends format."), + NATIVE("NATIVE", "NATIVE", "A format specific for the chosen state backend."), + UNKNOWN("UNKNOWN", "UNKNOWN", "unknown"), + ; + + @JsonCreator + public static SavepointFormatType of(String value) { + return Arrays.stream(values()) + .filter(instance -> instance.getValue().equals(value)) + .findAny().orElseThrow(() -> new EnumConstantNotPresentException(SavepointFormatType.class, value)); + } + + @EnumValue + private String value; + private String label; + private String remark; + + SavepointFormatType(String value, String label, String remark) { + this.value = value; + this.label = label; + this.remark = remark; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getLabel() { + return label; + } + + @Override + public String getRemark() { + return remark; + } + + +} diff --git a/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointTriggerType.java b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointTriggerType.java new file mode 100644 index 000000000..e7d1d75c7 --- /dev/null +++ b/scaleph-common/src/main/java/cn/sliew/scaleph/common/dict/flink/kubernetes/SavepointTriggerType.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.common.dict.flink.kubernetes; + +import cn.sliew.scaleph.common.dict.DictInstance; +import com.baomidou.mybatisplus.annotation.EnumValue; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Arrays; + +@JsonFormat(shape = JsonFormat.Shape.OBJECT) +public enum SavepointTriggerType implements DictInstance { + + MANUAL("MANUAL", "MANUAL", "Savepoint manually triggered by changing the savepointTriggerNonce."), + PERIODIC("PERIODIC", "PERIODIC", "Savepoint periodically triggered by the operator."), + UPGRADE("UPGRADE", "UPGRADE", "Savepoint triggered during stateful upgrade."), + UNKNOWN("UNKNOWN", "UNKNOWN", "Savepoint trigger mechanism unknown, such as savepoint retrieved directly from Flink job."), + ; + + @JsonCreator + public static SavepointTriggerType of(String value) { + return Arrays.stream(values()) + .filter(instance -> instance.getValue().equals(value)) + .findAny().orElseThrow(() -> new EnumConstantNotPresentException(SavepointTriggerType.class, value)); + } + + @EnumValue + private String value; + private String label; + private String remark; + + SavepointTriggerType(String value, String label, String remark) { + this.value = value; + this.label = label; + this.remark = remark; + } + + @Override + public String getValue() { + return value; + } + + @Override + public String getLabel() { + return label; + } + + @Override + public String getRemark() { + return remark; + } + + +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstanceSavepoint.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstanceSavepoint.java new file mode 100644 index 000000000..f8cb24477 --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/entity/master/ws/WsFlinkKubernetesJobInstanceSavepoint.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.entity.master.ws; + +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; +import cn.sliew.scaleph.dao.entity.BaseDO; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +/** + * flink kubernetes job instance savepoint + */ +@Data +@TableName("ws_flink_kubernetes_job_instance_savepoint") +public class WsFlinkKubernetesJobInstanceSavepoint extends BaseDO { + + private static final long serialVersionUID = 1L; + + @TableField("ws_flink_kubernetes_job_instance_id") + private Long wsFlinkKubernetesJobInstanceId; + + @TableField("time_stamp") + private Long timeStamp; + + @TableField("location") + private String location; + + @TableField("trigger_type") + private SavepointTriggerType triggerType; + + @TableField("format_type") + private SavepointFormatType formatType; +} diff --git a/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.java b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.java new file mode 100644 index 000000000..48f8c304b --- /dev/null +++ b/scaleph-dao/src/main/java/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.dao.mapper.master.ws; + +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.springframework.stereotype.Repository; + +/** + * flink kubernetes job instance savepoint Mapper 接口 + */ +@Repository +public interface WsFlinkKubernetesJobInstanceSavepointMapper extends BaseMapper { + +} diff --git a/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.xml b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.xml new file mode 100644 index 000000000..a9c4c60c5 --- /dev/null +++ b/scaleph-dao/src/main/resources/cn/sliew/scaleph/dao/mapper/master/ws/WsFlinkKubernetesJobInstanceSavepointMapper.xml @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + id, + creator, + create_time, + editor, + update_time, + ws_flink_kubernetes_job_instance_id, time_stamp, location, trigger_type, format_type + + diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java index de130d9bf..40a163c8f 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/WsFlinkKubernetesJobInstanceService.java @@ -21,8 +21,10 @@ import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.FlinkDeploymentStatus; import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.deployment.FlinkDeployment; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceDeployParam; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceListParam; +import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceSavepointListParam; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceShutdownParam; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import io.fabric8.kubernetes.api.model.GenericKubernetesResource; @@ -37,6 +39,8 @@ public interface WsFlinkKubernetesJobInstanceService { Optional selectCurrent(Long wsFlinkKubernetesJobId); + Page selectSavepoint(WsFlinkKubernetesJobInstanceSavepointListParam param); + String mockYaml(Long wsFlinkKubernetesJobId); String asYaml(Long id) throws Exception; diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/convert/WsFlinkKubernetesJobInstanceSavepointConvert.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/convert/WsFlinkKubernetesJobInstanceSavepointConvert.java new file mode 100644 index 000000000..540bf0e1e --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/convert/WsFlinkKubernetesJobInstanceSavepointConvert.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.service.convert; + +import cn.sliew.scaleph.common.convert.BaseConvert; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO; +import org.mapstruct.Mapper; +import org.mapstruct.ReportingPolicy; +import org.mapstruct.factory.Mappers; + +@Mapper(unmappedTargetPolicy = ReportingPolicy.IGNORE) +public interface WsFlinkKubernetesJobInstanceSavepointConvert extends BaseConvert { + WsFlinkKubernetesJobInstanceSavepointConvert INSTANCE = Mappers.getMapper(WsFlinkKubernetesJobInstanceSavepointConvert.class); + +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceSavepointDTO.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceSavepointDTO.java new file mode 100644 index 000000000..94e4d7598 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/dto/WsFlinkKubernetesJobInstanceSavepointDTO.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.service.dto; + +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; +import cn.sliew.scaleph.system.model.BaseDTO; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + *

+ * flink kubernetes job instance + *

+ */ +@Data +@Schema(name = "WsFlinkKubernetesJobInstance对象", description = "flink kubernetes job instance") +public class WsFlinkKubernetesJobInstanceSavepointDTO extends BaseDTO { + + private static final long serialVersionUID = 1L; + + @Schema(description = "flink kubernetes job instance id") + private Long wsFlinkKubernetesJobInstanceId; + + @Schema(description = "savepoint timestamp") + private Long timeStamp; + + @Schema(description = "savepoint location") + private String location; + + @Schema(description = "savepoint trigger type") + private SavepointTriggerType triggerType; + + @Schema(description = "savepoint format type") + private SavepointFormatType formatType; +} diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java index edd4fe6ec..794d10d20 100644 --- a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/impl/WsFlinkKubernetesJobInstanceServiceImpl.java @@ -22,22 +22,31 @@ import cn.sliew.milky.common.util.JacksonUtil; import cn.sliew.scaleph.common.dict.flink.FlinkJobState; import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType; +import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType; import cn.sliew.scaleph.common.util.UUIDUtil; import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstance; +import cn.sliew.scaleph.dao.entity.master.ws.WsFlinkKubernetesJobInstanceSavepoint; import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceMapper; +import cn.sliew.scaleph.dao.mapper.master.ws.WsFlinkKubernetesJobInstanceSavepointMapper; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.FlinkDeploymentSpec; import cn.sliew.scaleph.engine.flink.kubernetes.operator.spec.JobState; import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.FlinkDeploymentStatus; import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.JobStatus; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.Savepoint; +import cn.sliew.scaleph.engine.flink.kubernetes.operator.status.SavepointInfo; import cn.sliew.scaleph.engine.flink.kubernetes.resource.definition.job.instance.FlinkJobInstanceConverterFactory; import cn.sliew.scaleph.engine.flink.kubernetes.service.FlinkKubernetesOperatorService; import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobInstanceService; import cn.sliew.scaleph.engine.flink.kubernetes.service.WsFlinkKubernetesJobService; import cn.sliew.scaleph.engine.flink.kubernetes.service.convert.WsFlinkKubernetesJobInstanceConvert; +import cn.sliew.scaleph.engine.flink.kubernetes.service.convert.WsFlinkKubernetesJobInstanceSavepointConvert; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceDTO; +import cn.sliew.scaleph.engine.flink.kubernetes.service.dto.WsFlinkKubernetesJobInstanceSavepointDTO; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceDeployParam; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceListParam; +import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceSavepointListParam; import cn.sliew.scaleph.engine.flink.kubernetes.service.param.WsFlinkKubernetesJobInstanceShutdownParam; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -65,6 +74,8 @@ public class WsFlinkKubernetesJobInstanceServiceImpl implements WsFlinkKubernete @Autowired private WsFlinkKubernetesJobInstanceMapper wsFlinkKubernetesJobInstanceMapper; @Autowired + private WsFlinkKubernetesJobInstanceSavepointMapper wsFlinkKubernetesJobInstanceSavepointMapper; + @Autowired private WsFlinkKubernetesJobService wsFlinkKubernetesJobService; @Autowired private FlinkKubernetesOperatorService flinkKubernetesOperatorService; @@ -97,6 +108,19 @@ public Optional selectCurrent(Long wsFlinkKuber return optional.map(record -> WsFlinkKubernetesJobInstanceConvert.INSTANCE.toDto(record)); } + @Override + public Page selectSavepoint(WsFlinkKubernetesJobInstanceSavepointListParam param) { + Page page = new Page<>(param.getCurrent(), param.getPageSize()); + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkKubernetesJobInstanceSavepoint.class) + .eq(WsFlinkKubernetesJobInstanceSavepoint::getWsFlinkKubernetesJobInstanceId, param.getWsFlinkKubernetesJobInstanceId()) + .orderByDesc(WsFlinkKubernetesJobInstanceSavepoint::getTimeStamp); + Page wsFlinkKubernetesJobInstanceSavepointPage = wsFlinkKubernetesJobInstanceSavepointMapper.selectPage(page, queryWrapper); + Page result = new Page<>(wsFlinkKubernetesJobInstanceSavepointPage.getCurrent(), wsFlinkKubernetesJobInstanceSavepointPage.getSize(), wsFlinkKubernetesJobInstanceSavepointPage.getTotal()); + List wsFlinkKubernetesJobInstanceSavepointDTOS = WsFlinkKubernetesJobInstanceSavepointConvert.INSTANCE.toDto(wsFlinkKubernetesJobInstanceSavepointPage.getRecords()); + result.setRecords(wsFlinkKubernetesJobInstanceSavepointDTOS); + return result; + } + @Override public String mockYaml(Long wsFlinkKubernetesJobId) { WsFlinkKubernetesJobDTO jobDTO = wsFlinkKubernetesJobService.selectOne(wsFlinkKubernetesJobId); @@ -315,9 +339,35 @@ public int updateStatus(Long id, FlinkDeploymentStatus status) { } else { record.setTaskManagerInfo(JacksonUtil.toJsonString(status.getTaskManager())); } + updateSavepoint(id, status); return wsFlinkKubernetesJobInstanceMapper.updateById(record); } + private void updateSavepoint(Long id, FlinkDeploymentStatus status) { + SavepointInfo savepointInfo = status.getJobStatus().getSavepointInfo(); + if (savepointInfo == null) { + return; + } + if (CollectionUtils.isEmpty(savepointInfo.getSavepointHistory()) == false) { + for (Savepoint savepoint : savepointInfo.getSavepointHistory()) { + WsFlinkKubernetesJobInstanceSavepoint record = new WsFlinkKubernetesJobInstanceSavepoint(); + record.setWsFlinkKubernetesJobInstanceId(id); + record.setTimeStamp(savepoint.getTimeStamp()); + record.setLocation(savepoint.getLocation()); + record.setTriggerType(EnumUtils.getEnum(SavepointTriggerType.class, savepoint.getTriggerType().name())); + record.setFormatType(EnumUtils.getEnum(SavepointFormatType.class, savepoint.getFormatType().name())); + + LambdaQueryWrapper queryWrapper = Wrappers.lambdaQuery(WsFlinkKubernetesJobInstanceSavepoint.class) + .eq(WsFlinkKubernetesJobInstanceSavepoint::getWsFlinkKubernetesJobInstanceId, id) + .eq(WsFlinkKubernetesJobInstanceSavepoint::getTimeStamp, savepoint.getTimeStamp()); + WsFlinkKubernetesJobInstanceSavepoint oldRecord = wsFlinkKubernetesJobInstanceSavepointMapper.selectOne(queryWrapper); + if (oldRecord == null) { + wsFlinkKubernetesJobInstanceSavepointMapper.insert(record); + } + } + } + } + @Override public int clearStatus(Long id) { WsFlinkKubernetesJobInstance record = new WsFlinkKubernetesJobInstance(); diff --git a/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceSavepointListParam.java b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceSavepointListParam.java new file mode 100644 index 000000000..5ba303f14 --- /dev/null +++ b/scaleph-engine/scaleph-engine-flink-kubernetes/src/main/java/cn/sliew/scaleph/engine/flink/kubernetes/service/param/WsFlinkKubernetesJobInstanceSavepointListParam.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.sliew.scaleph.engine.flink.kubernetes.service.param; + +import cn.sliew.scaleph.system.model.PaginationParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import javax.validation.constraints.NotNull; + +@Data +public class WsFlinkKubernetesJobInstanceSavepointListParam extends PaginationParam { + + @NotNull + @Schema(description = "flink kubernetes job instance id") + private Long wsFlinkKubernetesJobInstanceId; +} diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayQueryParamsDTO.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayQueryParamsDTO.java index bf36c31f9..1514c638f 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayQueryParamsDTO.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/dto/WsFlinkSqlGatewayQueryParamsDTO.java @@ -21,7 +21,6 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.NoArgsConstructor; import lombok.NonNull; import java.util.Map; @@ -29,9 +28,12 @@ @Data @EqualsAndHashCode @Schema(name = "SqlGateway执行Sql的参数", description = "SqlGateway执行Sql的参数") -@NoArgsConstructor public class WsFlinkSqlGatewayQueryParamsDTO { + @NonNull + @Schema(description = "sql") private String sql; + + @Schema(description = "配置参数") private Map configuration; } diff --git a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java index a589bc070..90a44f082 100644 --- a/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java +++ b/scaleph-engine/scaleph-engine-sql-gateway/src/main/java/cn/sliew/scaleph/engine/sql/gateway/internal/ScalephSqlGatewaySessionManager.java @@ -40,6 +40,7 @@ import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.resource.ResourceType; import org.apache.flink.table.resource.ResourceUri; +import org.springframework.util.CollectionUtils; import java.io.IOException; import java.net.URI; @@ -267,7 +268,9 @@ public List completeStatement(SessionHandle sessionHandle, String statem public String executeStatement(SessionHandle sessionHandle, Map configuration, String sql) { SessionContext sessionContext = getSession(sessionHandle).getSessionContext(); Configuration sessionConf = new Configuration(sessionContext.getSessionConf()); - configuration.forEach(sessionConf::setString); + if (CollectionUtils.isEmpty(configuration) == false) { + configuration.forEach(sessionConf::setString); + } return sessionContext.getOperationManager() .submitOperation(handle -> sessionContext.createOperationExecutor(sessionConf) diff --git a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx index b615b0470..22b9bf8eb 100644 --- a/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx +++ b/scaleph-ui-react/src/pages/Project/Workspace/Kubernetes/Job/Detail/index.tsx @@ -1,6 +1,6 @@ import {connect, useAccess, useIntl, useLocation} from "umi"; import React, {useEffect, useState} from "react"; -import {Button, Tabs} from "antd"; +import {Button, message, Popconfirm, Tabs} from "antd"; import FlinkKubernetesJobDetailYAMLWeb from "@/pages/Project/Workspace/Kubernetes/Job/Detail/YAML"; import {ProDescriptionsItemProps} from "@ant-design/pro-descriptions"; import {WsFlinkKubernetesJob} from "@/services/project/typings"; @@ -112,15 +112,24 @@ const FlinkKubernetesJobDetailWeb: React.FC = (props: any) => { > {intl.formatMessage({id: 'pages.project.flink.kubernetes.job.detail.deploy'})} - + + + + ,
diff --git a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql index 0d5bda458..0e1aaefee 100644 --- a/tools/docker/mysql/init.d/scaleph-ws-mysql.sql +++ b/tools/docker/mysql/init.d/scaleph-ws-mysql.sql @@ -47,6 +47,8 @@ INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, ` VALUES (5, 1, '2', 'fake', NULL, 'sys', 'sys'); INSERT INTO `ws_flink_artifact`(`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) VALUES (6, 1, '0', 'catalog-example', NULL, 'sys', 'sys'); +INSERT INTO `ws_flink_artifact` (`id`, `project_id`, `type`, `name`, `remark`, `creator`, `editor`) +VALUES (7, 1, '0', 'select-example', NULL, 'sys', 'sys'); drop table if exists ws_flink_artifact_jar; create table ws_flink_artifact_jar @@ -103,6 +105,11 @@ INSERT INTO `ws_flink_artifact_sql`(`id`, `flink_artifact_id`, `flink_version`, VALUES (4, 6, '1.17.1', 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\',\n \'number-of-rows\' = \'100\'\n);\n\nCREATE TABLE my_catalog.my_database.sink_table (\n `id` BIGINT,\n `name` VARCHAR(2147483647),\n `age` INT,\n `address` VARCHAR(2147483647),\n `create_time` TIMESTAMP(3),\n `update_time` TIMESTAMP(3)\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'print\'\n);\n\nINSERT INTO my_catalog.my_database.sink_table\nSELECT id, name, age, address, create_time, update_time FROM my_catalog.my_database.source_table;', '1', 'sys', 'sys'); +INSERT INTO `ws_flink_artifact_sql` (`id`, `flink_artifact_id`, `flink_version`, `script`, `current`, `creator`, + `editor`) +VALUES (5, 7, '1.17.1', + 'CREATE CATALOG my_catalog WITH (\n \'type\' = \'generic_in_memory\'\n);\n\nCREATE DATABASE my_catalog.my_database;\n\n\nCREATE TABLE my_catalog.my_database.source_table (\n `id` bigint,\n `name` string,\n `age` int,\n `address` string,\n `create_time`TIMESTAMP(3),\n `update_time`TIMESTAMP(3),\n WATERMARK FOR `update_time` AS update_time - INTERVAL \'1\' MINUTE\n)\nCOMMENT \'\'\nWITH (\n \'connector\' = \'datagen\'\n);\n\nSELECT * FROM my_catalog.my_database.source_table;', + '1', 'sys', 'sys'); drop table if exists ws_di_job; create table ws_di_job @@ -203,34 +210,6 @@ INSERT INTO ws_di_job_link(job_id, link_code, from_step_code, to_step_code, crea VALUES (2, 'd57021a1-65c7-4dfe-ae89-3b73d00fcf72', '6223c6c3-b552-4c69-adab-5300b7514fad', 'f08143b4-34dc-4190-8723-e8d8ce49738f', 'sys', 'sys'); -DROP TABLE IF EXISTS ws_flink_checkpoint; -CREATE TABLE ws_flink_checkpoint -( - id bigint not null auto_increment comment 'id', - flink_job_instance_id bigint not null comment 'flink job instance id', - flink_checkpoint_id bigint not null comment 'flink checkpoint id', - checkpoint_type varchar(16) not null comment 'checkpoint type', - `status` varchar(16) not null comment 'checkpoint status', - `savepoint` tinyint not null comment 'is savepoint', - trigger_timestamp bigint not null comment 'checkpoint trigger timestamp', - duration bigint comment 'checkpoint duration', - discarded tinyint not null comment 'is discarded', - external_path text comment 'checkpoint path', - state_size bigint comment 'state size', - processed_data bigint comment 'processed data size', - persisted_data bigint comment 'persisted data size', - alignment_buffered bigint comment 'checkpoint alignment buffered size', - num_subtasks int comment 'subtask nums', - num_acknowledged_subtasks int comment 'acknowledged subtask nums', - latest_ack_timestamp bigint comment 'latest acknowledged subtask timestamp', - creator varchar(32) comment '创建人', - create_time timestamp default current_timestamp comment '创建时间', - editor varchar(32) comment '修改人', - update_time timestamp default current_timestamp on update current_timestamp comment '修改时间', - PRIMARY KEY (id), - UNIQUE KEY uniq_job (flink_job_instance_id, flink_checkpoint_id) -) ENGINE = INNODB COMMENT = 'flink checkpoint'; - DROP TABLE IF EXISTS ws_flink_kubernetes_template; CREATE TABLE ws_flink_kubernetes_template ( @@ -445,3 +424,20 @@ CREATE TABLE ws_flink_kubernetes_job_instance PRIMARY KEY (id), UNIQUE KEY uniq_key (ws_flink_kubernetes_job_id, instance_id) ) ENGINE = INNODB COMMENT = 'flink kubernetes job instance'; + +DROP TABLE IF EXISTS ws_flink_kubernetes_job_instance_savepoint; +CREATE TABLE ws_flink_kubernetes_job_instance_savepoint +( + id bigint not null auto_increment, + ws_flink_kubernetes_job_instance_id bigint not null, + time_stamp bigint not null, + location text, + trigger_type varchar(32), + format_type varchar(32), + creator varchar(32), + create_time datetime not null default current_timestamp, + editor varchar(32), + update_time datetime not null default current_timestamp on update current_timestamp, + PRIMARY KEY (id), + UNIQUE KEY uniq_key (ws_flink_kubernetes_job_instance_id, time_stamp) +) ENGINE = INNODB COMMENT = 'flink kubernetes job instance savepoint'; \ No newline at end of file