Skip to content

Commit

Permalink
[Feature][scaleph-engine-flink-kubernetes] update flink savepoint ser…
Browse files Browse the repository at this point in the history
…ivce (#585)

* feature: update flink kubernetes job instance service

* feature: add popconfirm for restart and savepoint

* feature: add flink job savepoint table

* feature: add flink job savepoint table

* feature: add flink job savepoint service

* feature: add flink job savepoint service

* feature: update flink sql gateway open api and add flink sql select example
  • Loading branch information
kalencaya authored Aug 4, 2023
1 parent 7a1c950 commit cb3112b
Show file tree
Hide file tree
Showing 16 changed files with 568 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import cn.sliew.scaleph.engine.sql.gateway.dto.WsFlinkSqlGatewayQueryResultDTO;
import cn.sliew.scaleph.engine.sql.gateway.dto.catalog.CatalogInfo;
import cn.sliew.scaleph.engine.sql.gateway.services.WsFlinkSqlGatewayService;
import cn.sliew.scaleph.system.model.PaginationParam;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.apache.flink.table.gateway.api.results.GatewayInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
Expand All @@ -32,7 +35,6 @@
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand All @@ -50,50 +52,67 @@ public WsFlinkSqlGatewayController(WsFlinkSqlGatewayService wsFlinkSqlGatewaySer

@GetMapping("{clusterId}/info")
@Operation(summary = "获取SqlGateway信息", description = "获取SqlGateway信息")
@Parameters(@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"))
public ResponseEntity<GatewayInfo> getClusterInfo(@PathVariable("clusterId") String clusterId) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.getGatewayInfo(clusterId));
}

@RequestMapping(value = "{clusterId}/openSession", method = {RequestMethod.PUT, RequestMethod.POST})
@PutMapping("{clusterId}/openSession")
@Operation(summary = "新建Session", description = "新建Session")
@Parameters(@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"))
public ResponseEntity<String> openSession(@PathVariable("clusterId") String clusterId) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.openSession(clusterId));
}

@DeleteMapping("{clusterId}/{sessionHandleId}")
@Operation(summary = "关闭Session", description = "关闭Session")
public ResponseEntity<String> closeSession(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId) {
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果")
})
public ResponseEntity<String> closeSession(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.closeSession(clusterId, sessionHandleId));
}

@GetMapping("{clusterId}/{sessionHandleId}/getCatalogInfo")
@Operation(summary = "获取所有Catalog信息", description = "获取所有Catalog信息")
public ResponseEntity<Set<CatalogInfo>> getCatalogInfo(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) {
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果"),
@Parameter(name = "includeSystemFunctions", description = "是否返回系统函数")
})
public ResponseEntity<Set<CatalogInfo>> getCatalogInfo(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam(value = "includeSystemFunctions", defaultValue = "false") boolean includeSystemFunctions) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.getCatalogInfo(clusterId, sessionHandleId, includeSystemFunctions));
}


@RequestMapping(value = "{clusterId}/{sessionHandleId}/executeSql", method = {RequestMethod.POST, RequestMethod.PUT})
@PostMapping("{clusterId}/{sessionHandleId}/executeSql")
@Operation(summary = "执行FlinkSql", description = "执行FlinkSql")
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果")
})
public ResponseEntity<String> executeSql(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestBody WsFlinkSqlGatewayQueryParamsDTO params) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.executeSql(clusterId, sessionHandleId, params));
}

@GetMapping("{clusterId}/{sessionHandleId}/{operationHandleId}/results/{token}")
@GetMapping("{clusterId}/{sessionHandleId}/{operationHandleId}/results")
@Operation(summary = "获取Sql执行结果", description = "获取Sql执行结果")
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果"),
@Parameter(name = "operationHandleId", description = "executeSql() 返回结果")
})
public ResponseEntity<WsFlinkSqlGatewayQueryResultDTO> fetchResults(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@PathVariable("operationHandleId") String operationHandleId,
@PathVariable(value = "token", required = false) Long token,
@RequestParam(value = "maxRows", required = false, defaultValue = "100") int maxRows) {
ResultSet resultSet = wsFlinkSqlGatewayService.fetchResults(clusterId, sessionHandleId, operationHandleId, token, maxRows);
PaginationParam param) {
ResultSet resultSet = wsFlinkSqlGatewayService.fetchResults(clusterId, sessionHandleId, operationHandleId, param.getCurrent(), param.getPageSize().intValue());
try {
WsFlinkSqlGatewayQueryResultDTO wsFlinkSqlGatewayQueryResultDTO = WsFlinkSqlGatewayQueryResultDTO.fromResultSet(resultSet);
return ResponseEntity.ok(wsFlinkSqlGatewayQueryResultDTO);
Expand All @@ -104,6 +123,11 @@ public ResponseEntity<WsFlinkSqlGatewayQueryResultDTO> fetchResults(@PathVariabl

@DeleteMapping("{clusterId}/{sessionHandleId}/{operationHandleId}")
@Operation(summary = "取消执行的任务", description = "取消执行的任务")
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果"),
@Parameter(name = "operationHandleId", description = "executeSql() 返回结果")
})
public ResponseEntity<Boolean> cancel(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@PathVariable("operationHandleId") String operationHandleId) {
Expand All @@ -112,11 +136,16 @@ public ResponseEntity<Boolean> cancel(@PathVariable("clusterId") String clusterI

@GetMapping("{clusterId}/{sessionHandleId}/completeStatement")
@Operation(summary = "获取Sql提示", description = "获取Sql提示")
public ResponseEntity<List<String>> completeStatement(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam("statement") String statement,
@RequestParam("position") int position) {
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果"),
@Parameter(name = "statement", description = "sql 片段"),
@Parameter(name = "position", description = "position")
})
public ResponseEntity<List<String>> completeStatement(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam("statement") String statement,
@RequestParam("position") int position) {
try {
return ResponseEntity.ok(wsFlinkSqlGatewayService.completeStatement(clusterId, sessionHandleId, statement, position));
} catch (Exception e) {
Expand All @@ -126,21 +155,26 @@ public ResponseEntity<List<String>> completeStatement(

@PostMapping("{clusterId}/{sessionHandleId}/addDependencies")
@Operation(summary = "添加依赖jar包", description = "添加依赖jar包")
public ResponseEntity<Boolean> addDependencies(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam("jarIdList") List<Long> jarIdList
) {
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果"),
@Parameter(name = "jarIdList", description = "jar ids")
})
public ResponseEntity<Boolean> addDependencies(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestParam("jarIdList") List<Long> jarIdList) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.addDependencies(clusterId, sessionHandleId, jarIdList));
}

@PostMapping("{clusterId}/{sessionHandleId}/addCatalog")
@Operation(summary = "添加catalog", description = "添加catalog")
public ResponseEntity<Boolean> addCatalog(
@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params
) {
@Parameters({
@Parameter(name = "clusterId", description = "flink kubernetes session-cluster 的 sessionClusterId"),
@Parameter(name = "sessionHandleId", description = "openSession() 返回结果")
})
public ResponseEntity<Boolean> addCatalog(@PathVariable("clusterId") String clusterId,
@PathVariable("sessionHandleId") String sessionHandleId,
@RequestBody WsFlinkSqlGatewayCreateCatalogParamsDTO params) {
return ResponseEntity.ok(wsFlinkSqlGatewayService.addCatalog(clusterId, sessionHandleId, params.getCatalogName(), params.getOptions()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@
import cn.sliew.scaleph.common.dict.common.*;
import cn.sliew.scaleph.common.dict.ds.RedisMode;
import cn.sliew.scaleph.common.dict.flink.*;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentKind;
import cn.sliew.scaleph.common.dict.flink.kubernetes.DeploymentMode;
import cn.sliew.scaleph.common.dict.flink.kubernetes.ResourceLifecycleState;
import cn.sliew.scaleph.common.dict.flink.kubernetes.UpgradeMode;
import cn.sliew.scaleph.common.dict.flink.kubernetes.*;
import cn.sliew.scaleph.common.dict.image.ImagePullPolicy;
import cn.sliew.scaleph.common.dict.job.*;
import cn.sliew.scaleph.common.dict.seatunnel.*;
Expand Down Expand Up @@ -89,6 +86,8 @@ public enum DictType implements DictDefinition {
FLINK_KUBERNETES_DEPLOYMENT_KIND("deployment_kind", "Deployment 类型", DeploymentKind.class),
FLINK_KUBERNETES_RESOURCE_LIFECYCLE_STATE("resource_lifecycle_state", "Deployment 状态", ResourceLifecycleState.class),
FLINK_KUBERNETES_UPGRADE_MODE("upgrade_mode", "Upgrade 方式", UpgradeMode.class),
FLINK_KUBERNETES_SAVEPOINT_FORMAT_TYPE("savepoint_format_type", "Savepoint Format 类型", SavepointFormatType.class),
FLINK_KUBERNETES_SAVEPOINT_TRIGGER_TYPE("savepoint_trigger_type", "Savepoint Trigger 类型", SavepointTriggerType.class),

FLINK_CATALOG_COLUMN_TYPE("flink_catalog_column_type", "Flink Catalog Table Schema 列类型", CatalogColumnType.class),
FLINK_CATALOG_TABLE_KIND("flink_catalog_table_kind", "Flink Catalog Table 类型", CatalogTableKind.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.common.dict.flink.kubernetes;

import cn.sliew.scaleph.common.dict.DictInstance;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Arrays;

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum SavepointFormatType implements DictInstance {

CANONICAL("CANONICAL", "CANONICAL", "A canonical, common for all state backends format."),
NATIVE("NATIVE", "NATIVE", "A format specific for the chosen state backend."),
UNKNOWN("UNKNOWN", "UNKNOWN", "unknown"),
;

@JsonCreator
public static SavepointFormatType of(String value) {
return Arrays.stream(values())
.filter(instance -> instance.getValue().equals(value))
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(SavepointFormatType.class, value));
}

@EnumValue
private String value;
private String label;
private String remark;

SavepointFormatType(String value, String label, String remark) {
this.value = value;
this.label = label;
this.remark = remark;
}

@Override
public String getValue() {
return value;
}

@Override
public String getLabel() {
return label;
}

@Override
public String getRemark() {
return remark;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.common.dict.flink.kubernetes;

import cn.sliew.scaleph.common.dict.DictInstance;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Arrays;

@JsonFormat(shape = JsonFormat.Shape.OBJECT)
public enum SavepointTriggerType implements DictInstance {

MANUAL("MANUAL", "MANUAL", "Savepoint manually triggered by changing the savepointTriggerNonce."),
PERIODIC("PERIODIC", "PERIODIC", "Savepoint periodically triggered by the operator."),
UPGRADE("UPGRADE", "UPGRADE", "Savepoint triggered during stateful upgrade."),
UNKNOWN("UNKNOWN", "UNKNOWN", "Savepoint trigger mechanism unknown, such as savepoint retrieved directly from Flink job."),
;

@JsonCreator
public static SavepointTriggerType of(String value) {
return Arrays.stream(values())
.filter(instance -> instance.getValue().equals(value))
.findAny().orElseThrow(() -> new EnumConstantNotPresentException(SavepointTriggerType.class, value));
}

@EnumValue
private String value;
private String label;
private String remark;

SavepointTriggerType(String value, String label, String remark) {
this.value = value;
this.label = label;
this.remark = remark;
}

@Override
public String getValue() {
return value;
}

@Override
public String getLabel() {
return label;
}

@Override
public String getRemark() {
return remark;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.sliew.scaleph.dao.entity.master.ws;

import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointFormatType;
import cn.sliew.scaleph.common.dict.flink.kubernetes.SavepointTriggerType;
import cn.sliew.scaleph.dao.entity.BaseDO;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

/**
* flink kubernetes job instance savepoint
*/
@Data
@TableName("ws_flink_kubernetes_job_instance_savepoint")
public class WsFlinkKubernetesJobInstanceSavepoint extends BaseDO {

private static final long serialVersionUID = 1L;

@TableField("ws_flink_kubernetes_job_instance_id")
private Long wsFlinkKubernetesJobInstanceId;

@TableField("time_stamp")
private Long timeStamp;

@TableField("location")
private String location;

@TableField("trigger_type")
private SavepointTriggerType triggerType;

@TableField("format_type")
private SavepointFormatType formatType;
}
Loading

0 comments on commit cb3112b

Please sign in to comment.