Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][scaleph-workspace-seatunnel] upgrade flink、flink kubernetes operator and seatunnel #703

Merged
merged 9 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release-manual-docker-seatunnel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ on:
flinkVersion:
description: 'flink version'
required: true
default: '1.15'
default: '1.16'
type: choice
options:
- 1.15
- 1.16
env:
HUB: ghcr.io/flowerfine/scaleph-seatunnel
SEATUNNEL_VERSION: ${{ inputs.seatunnelVersion }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public enum FlinkVersion implements DictInstance {

V_1_18_0("1.18.0", "1.18.0"),
V_1_18_1("1.18.1", "1.18.1"),

V_1_19_0("1.19.0", "1.19.0"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
public enum OperatorFlinkVersion implements DictInstance {

v1_15("v1_15", "v1_15"),

v1_16("v1_16", "v1_16"),

v1_17("v1_17", "v1_17"),

v1_18("v1_18", "v1_18"),
v1_19("v1_19", "v1_19"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public enum WorkflowInstanceEvent implements DictInstance {
COMMAND_SHUTDOWN("1", "COMMAND_SHUTDOWN"),
COMMAND_SUSPEND("2", "COMMAND_SUSPEND"),
COMMAND_RESUME("3", "COMMAND_RESUME"),
PROCESS_SUCCESS("4", "PROCESS_SUCCESS"),
PROCESS_FAILURE("5", "PROCESS_FAILURE"),
PROCESS_TASK_CHANGE("4", "PROCESS_TASK_CHANGE"),
PROCESS_SUCCESS("5", "PROCESS_SUCCESS"),
PROCESS_FAILURE("6", "PROCESS_FAILURE"),
;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class FlinkDeploymentSpec extends AbstractFlinkSpec {
/**
* Base pod template for job and task manager pods. Can be overridden by the jobManager and
* taskManager pod templates.
* fixme change to PodTemplateSpec
*/
private Pod podTemplate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package cn.sliew.scaleph.engine.flink.kubernetes.operator.spec;

import io.fabric8.kubernetes.api.model.networking.v1.IngressTLS;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -47,4 +49,14 @@ public class IngressSpec {
* Ingress annotations.
*/
private Map<String, String> annotations;

/**
* Ingress labels.
*/
private Map<String, String> labels;

/**
* Ingress tls.
*/
private List<IngressTLS> tls;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public class JobSpec implements Diffable<JobSpec> {
*/
private Boolean allowNonRestoredState;

/**
* Nonce used to trigger a full redeployment of the job from the savepoint path specified in
* initialSavepointPath. In order to trigger redeployment, change the number to a different
* non-null value. Rollback is not possible after redeployment.
*/
private Long savepointRedeployNonce;

@Override
public DiffResult<JobSpec> diff(JobSpec right) {
ReflectionDiffBuilder builder = new ReflectionDiffBuilder(this, right, ToStringStyle.DEFAULT_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public abstract class CommonStatus<SPEC extends AbstractFlinkSpec> {
*/
private String error;

/** Last observed generation of the FlinkDeployment/FlinkSessionJob. */
private Long observedGeneration;

/**
* Lifecycle state of the Flink resource (including being rolled back, failed etc.).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ public class JobStatus {
/**
* Information about pending and last checkpoint for the job.
*/
private Object checkpointInfo = new Object();
private CheckpointInfo checkpointInfo = new CheckpointInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public enum FlinkImageMapping {
JAR_1_16(FlinkJobType.JAR, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "flink:1.16"),
JAR_1_17(FlinkJobType.JAR, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "flink:1.17"),
JAR_1_18(FlinkJobType.JAR, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "flink:1.18"),
JAR_1_19(FlinkJobType.JAR, OperatorFlinkVersion.v1_19, FlinkVersionMapping.V_1_19, "flink:1.19"),

SQL_1_17(FlinkJobType.SQL, OperatorFlinkVersion.v1_17, FlinkVersionMapping.V_1_17, "ghcr.io/flowerfine/scaleph-sql-template:1.17"),
SQL_1_18(FlinkJobType.SQL, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-sql-template:1.18"),

SEATUNNEL_1_15(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_15, FlinkVersionMapping.V_1_15, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.15"),
SEATUNNEL_1_16(FlinkJobType.SEATUNNEL, OperatorFlinkVersion.v1_16, FlinkVersionMapping.V_1_16, "ghcr.io/flowerfine/scaleph-seatunnel:2.3.4-flink-1.16"),
FLINK_CDC_1_18(FlinkJobType.FLINK_CDC, OperatorFlinkVersion.v1_18, FlinkVersionMapping.V_1_18, "ghcr.io/flowerfine/scaleph-flink-cdc:3.0.0-flink-1.18"),
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
@Getter
public enum FlinkVersionMapping {

V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_0, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_19(OperatorFlinkVersion.v1_19, FlinkVersion.V_1_19_0, FlinkVersion.V_1_19_0),
V_1_18(OperatorFlinkVersion.v1_18, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_1, FlinkVersion.V_1_18_0),
V_1_17(OperatorFlinkVersion.v1_17, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_2, FlinkVersion.V_1_17_1, FlinkVersion.V_1_17_0),
V_1_16(OperatorFlinkVersion.v1_16, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_3, FlinkVersion.V_1_16_2, FlinkVersion.V_1_16_1, FlinkVersion.V_1_16_0),
V_1_15(OperatorFlinkVersion.v1_15, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_4, FlinkVersion.V_1_15_3, FlinkVersion.V_1_15_2, FlinkVersion.V_1_15_1, FlinkVersion.V_1_15_0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;

import java.io.Serializable;
import java.util.Date;
Expand All @@ -28,10 +29,13 @@
* @author gleiyu
*/
@Data
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class BaseDTO implements Serializable {

private static final long serialVersionUID = -3170630380110141492L;

// 仅使用 id 作为 equals 和 hashcode 字段
@EqualsAndHashCode.Include
@Schema(description = "ID")
private Long id;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.statemachine.WorkflowInstanceStateMachine;
import cn.sliew.scaleph.workflow.statemachine.WorkflowTaskInstanceStateMachine;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RScheduledExecutorService;
Expand All @@ -41,6 +42,8 @@ public abstract class AbstractWorkflowTaskInstanceEventListener implements Workf
@Autowired
protected WorkflowTaskInstanceService workflowTaskInstanceService;
@Autowired
protected WorkflowInstanceStateMachine workflowInstanceStateMachine;
@Autowired
protected WorkflowTaskInstanceStateMachine stateMachine;
@Autowired
private RedissonClient redissonClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import org.springframework.stereotype.Component;

import java.io.Serializable;
Expand Down Expand Up @@ -45,6 +46,8 @@ public FailureRunner(Long workflowTaskInstanceId, Optional<Throwable> throwable)
@Override
public void run() {
workflowTaskInstanceService.updateFailure(workflowTaskInstanceId, throwable.orElse(null));
WorkflowTaskInstanceDTO workflowTaskInstanceDTO = workflowTaskInstanceService.get(workflowTaskInstanceId);
workflowInstanceStateMachine.onTaskChange(workflowTaskInstanceDTO.getWorkflowInstanceDTO());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package cn.sliew.scaleph.workflow.listener.taskinstance;

import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import org.springframework.stereotype.Component;

import java.io.Serializable;
Expand All @@ -42,6 +43,8 @@ public SuccessRunner(Long workflowTaskInstanceId) {
@Override
public void run() {
workflowTaskInstanceService.updateSuccess(workflowTaskInstanceId);
WorkflowTaskInstanceDTO workflowTaskInstanceDTO = workflowTaskInstanceService.get(workflowTaskInstanceId);
workflowInstanceStateMachine.onTaskChange(workflowTaskInstanceDTO.getWorkflowInstanceDTO());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@

package cn.sliew.scaleph.workflow.listener.workflowinstance;

import cn.sliew.milky.common.util.JacksonUtil;
import cn.sliew.scaleph.workflow.service.WorkflowInstanceService;
import cn.sliew.scaleph.workflow.service.WorkflowTaskDefinitionService;
import cn.sliew.scaleph.workflow.service.WorkflowTaskInstanceService;
import cn.sliew.scaleph.workflow.service.dto.WorkflowDefinitionDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowInstanceDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskDefinitionDTO;
import cn.sliew.scaleph.workflow.service.dto.WorkflowTaskInstanceDTO;
import com.google.common.graph.Graph;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.annotation.RInject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

@Slf4j
Expand All @@ -45,8 +45,6 @@ protected CompletableFuture handleEventAsync(WorkflowInstanceEventDTO event) {
future.whenCompleteAsync((unused, throwable) -> {
if (throwable != null) {
onFailure(event.getWorkflowInstanceId(), throwable);
} else {
stateMachine.onSuccess(workflowInstanceService.get(event.getWorkflowInstanceId()));
}
});
return future;
Expand Down Expand Up @@ -79,11 +77,16 @@ public void run() {
WorkflowInstanceDTO workflowInstanceDTO = workflowInstanceService.get(event.getWorkflowInstanceId());
WorkflowDefinitionDTO workflowDefinitionDTO = workflowInstanceDTO.getWorkflowDefinition();

List<WorkflowTaskDefinitionDTO> workflowTaskDefinitionDTOS = workflowTaskDefinitionService.list(workflowDefinitionDTO.getId());
List<WorkflowTaskInstanceDTO> workflowTaskInstanceDTOS = new ArrayList<>(workflowTaskDefinitionDTOS.size());
// todo 应该是找到 root 节点,批量启动 root 节点
for (WorkflowTaskDefinitionDTO workflowTaskDefinitionDTO : workflowTaskDefinitionDTOS) {
workflowTaskInstanceDTOS.add(workflowTaskInstanceService.deploy(workflowTaskDefinitionDTO.getId(), event.getWorkflowInstanceId()));
// 找到 root 节点,批量启动 root 节点
Graph<WorkflowTaskDefinitionDTO> dag = workflowTaskDefinitionService.getDag(workflowDefinitionDTO.getId());
Set<WorkflowTaskDefinitionDTO> nodes = dag.nodes();
for (WorkflowTaskDefinitionDTO workflowTaskDefinitionDTO : nodes) {
System.out.println("验证是否需要启动 root 节点: " + JacksonUtil.toJsonString(workflowTaskDefinitionDTO));
// root 节点
if (dag.inDegree(workflowTaskDefinitionDTO) == 0) {
workflowTaskInstanceService.deploy(workflowTaskDefinitionDTO.getId(), event.getWorkflowInstanceId());
System.out.println("真正启动 root 节点: " + JacksonUtil.toJsonString(workflowTaskDefinitionDTO));
}
}
// todo 循环检测 workflowTaskInstanceDTOS 状态或接收 workflowTaskInstance 事件,判断是否成功或失败
}
Expand Down
Loading
Loading