Skip to content

Commit

Permalink
rewrite flow dag with antv x6
Browse files Browse the repository at this point in the history
  • Loading branch information
iisquare committed Jun 14, 2022
1 parent 236c1ee commit d35642e
Show file tree
Hide file tree
Showing 77 changed files with 2,302 additions and 587 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
**/*.log
**/__pycache__
**/*.zip
!.gitkeep

/docker/.env
/docker/runtime
Expand Down
2 changes: 2 additions & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,5 @@ rm -rf ./runtime/mongo
- [为容器设置启动时要执行的命令和参数](https://kubernetes.io/zh/docs/tasks/inject-data-application/define-command-argument-container/)
- [docker-compose建立容器之间的连接关系](https://www.jianshu.com/p/1e80c2866a9d)
- [Docker run reference VOLUME (shared filesystems)](https://docs.docker.com/engine/reference/run/#volume-shared-filesystems)
- [Segmentation fault when run old debian containers if docker host is debian10(buster)](https://stackoverflow.com/questions/57807835/segmentation-fault-when-run-old-debian-containers-if-docker-host-is-debian10bus)
- [Enable vsyscall=emulate in the kernel config to run older base images such as Centos 6](https://github.com/microsoft/WSL/issues/4694)
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ services:
- "${BIND_ADDRESS}${BIND_COLON}${POSTGRES_PORT}:5432"
volumes:
- ${DATA_DIR}/postgres/data:/var/lib/postgresql/data
- ${DATA_DIR}/postgres/archive:/var/lib/postgresql/archive
openresty:
build:
context: ./service/openresty
Expand Down
4 changes: 2 additions & 2 deletions docker/env-example
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DATA_DIR=/data/runtime

BIND_COLON=:
BIND_ADDRESS=127.0.0.1
BIND_COLON=
BIND_ADDRESS=

ZOOKEEPER_VERSION=3.5.8
ZOOKEEPER_CLIENT_PORT=2181
Expand Down
5 changes: 0 additions & 5 deletions docker/service/fastdfs/Dockerfile

This file was deleted.

4 changes: 0 additions & 4 deletions docker/service/fastdfs/README.md

This file was deleted.

4 changes: 4 additions & 0 deletions docker/service/postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,8 @@ ARG POSTGRES_VERSION

FROM postgres:${POSTGRES_VERSION}

ADD postgresql.conf /etc/postgresql/postgresql.conf

EXPOSE 5432

CMD ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]
8 changes: 8 additions & 0 deletions docker/service/postgres/postgresql.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# docker run -i --rm postgres cat /usr/share/postgresql/postgresql.conf.sample

listen_addresses = '*'

wal_level = logical

archive_mode = on
archive_command = 'cp %p /var/lib/postgresql/archive/%f'
2 changes: 2 additions & 0 deletions java/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
/web/xlab/libs/x86/
/web/xlab/libs/*.jar
/web/file/uploads

/ext/flink/checkpoints/**/
1 change: 1 addition & 0 deletions java/app/flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies {
compile group: 'org.apache.flink', name: 'flink-connector-elasticsearch7_' + scalaVersion, version: flinkVersion

compile group: 'com.ververica', name: 'flink-connector-mysql-cdc', version: cdcVersion
compile group: 'com.ververica', name: 'flink-connector-postgres-cdc', version: cdcVersion

testCompile group: 'org.apache.flink', name: 'flink-runtime-web_' + scalaVersion, version: flinkVersion
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
Expand Down Expand Up @@ -67,4 +68,41 @@ public void cdcTest() throws Exception {
env.execute("cdc-test");
}

@Test
public void pgTest() throws Exception {
Configuration config = new Configuration();
config.setString(RestOptions.BIND_PORT,"8080-8089");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
env.enableCheckpointing(1000);
PostgreSQLSource.Builder<String> builder = PostgreSQLSource.<String>builder()
.hostname("127.0.0.1").username("postgres").password("admin888")
.database("postgres").schemaList("public").tableList(".*")
.decodingPluginName("pgoutput") // could not access file "decoderbufs": No such file or directory
.deserializer(new JsonDebeziumDeserializationSchema());
builder.debeziumProperties(new Properties(){{
/**
* Postgresql的LSN保存在主数据库中,启用Checkpoint后即可confirm,且不受fromSavepoint影响
* 清除: select * from pg_drop_replication_slot('flink');
* 查询: select * from pg_replication_slots;
* @see(https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html)
*/
put("snapshot.mode", "never"); // initial, always, never, initial_only, exported
}});
DataStreamSource<String> source = env.addSource(builder.build());
source.print();
env.execute("pg-test");
}

@Test
public void restTest() throws Exception {
Configuration config = new Configuration();
config.setString(RestOptions.BIND_PORT,"8081");
config.setInteger("taskmanager.numberOfTaskSlots", 3);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
// nc -l 8888
DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 8888);
source.print().setParallelism(1);
env.execute("rest-test");
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.iisquare.fs.base.dag.core;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.iisquare.fs.base.core.util.DPUtil;

import java.io.Serializable;
import java.util.*;
Expand All @@ -25,6 +28,7 @@ public Map<String, DAGNode> items(JsonNode items) throws Exception {
while (iterator.hasNext()) {
JsonNode item = iterator.next();
String type = item.at("/type").asText();
if (Arrays.asList("SubprocessLayout").contains(type)) continue;
DAGNode node = this.nodes.get(type).newInstance();
node.setId(item.at("/id").asText());
if (node instanceof DAGSource) {
Expand All @@ -50,8 +54,8 @@ public Map<String, DAGNode> relations(Map<String, DAGNode> items, JsonNode relat
Iterator<JsonNode> iterator = relations.iterator();
while (iterator.hasNext()) {
JsonNode item = iterator.next();
DAGNode source = items.get(item.get("sourceId").asText());
DAGNode target = items.get(item.get("targetId").asText());
DAGNode source = items.get(item.get("source").asText());
DAGNode target = items.get(item.get("target").asText());
source.getTargets().add(target);
target.getSources().add(source);
}
Expand Down Expand Up @@ -128,9 +132,106 @@ private boolean recursive(Map<String, DAGNode> nodeMap) throws Exception {
return !list.isEmpty();
}

public JsonNode edges(JsonNode nodes, JsonNode edges) {
/**
* 查找所有子流程
* {
* processId: {
* children: {
* childId: {}
* },
* incoming: { // edge.target
* childId: {}
* },
* outgoing: { // edge.source
* childId: {}
* }
* }
* }
*/
ObjectNode processes = DPUtil.objectNode();
Iterator<JsonNode> iterator = nodes.iterator();
while (iterator.hasNext()) {
JsonNode node = iterator.next();
if (!"SubprocessLayout".equals(node.at("/type").asText())) continue;
ObjectNode process = processes.putObject(node.at("/id").asText());
process.putObject("children"); // 全部子节点
process.putObject("incoming"); // 入度为零的节点
process.putObject("outgoing"); // 出度为零的节点
}
/**
* 查找子流程内的节点
*/
iterator = nodes.iterator();
while (iterator.hasNext()) {
JsonNode node = iterator.next();
String parent = node.at("/parent").asText();
if (!processes.has(parent)) continue;
ObjectNode children = (ObjectNode) processes.at("/" + parent + "/children");
children.putObject(node.at("/id").asText());
}
/**
* 出入节点索引
* sources -> {
* sourceId: [{}]
* }
* targets -> {
* targetId: [{}]
* }
*/
ObjectNode sources = DPUtil.objectNode();
ObjectNode targets = DPUtil.objectNode();
iterator = edges.iterator();
while (iterator.hasNext()) {
JsonNode edge = iterator.next();
String source = edge.at("/source").asText();
String target = edge.at("/target").asText();
(sources.has(source) ? (ArrayNode) sources.get(source) : sources.putArray(source)).add(edge);
(targets.has(target) ? (ArrayNode) targets.get(target) : targets.putArray(target)).add(edge);
}
/**
* 查找子流程内出入度为零的节点
*/
iterator = processes.iterator();
while (iterator.hasNext()) {
JsonNode process = iterator.next();
ObjectNode incoming = (ObjectNode) process.at("/incoming");
ObjectNode outgoing = (ObjectNode) process.at("/outgoing");
Iterator<Map.Entry<String, JsonNode>> it = process.at("/children").fields();
while (it.hasNext()) {
String key = it.next().getKey();
if (!sources.has(key)) outgoing.putObject(key);
if (!targets.has(key)) incoming.putObject(key);
}
}
/**
* 展开子流程节点
*/
ArrayNode result = DPUtil.arrayNode();
iterator = edges.iterator();
while (iterator.hasNext()) {
JsonNode edge = iterator.next();
String source = edge.at("/source").asText();
String target = edge.at("/target").asText();
Iterator<Map.Entry<String, JsonNode>> sit = null; // 来源 -> processes[processId].outgoing
Iterator<Map.Entry<String, JsonNode>> tit = null; // 目标 -> processes[processId].incoming
sit = (processes.has(source) ? processes.get(source).get("outgoing") : DPUtil.objectNode().put(source, "")).fields();
tit = (processes.has(target) ? processes.get(target).get("incoming") : DPUtil.objectNode().put(target, "")).fields();
while (sit.hasNext()) {
String sk = sit.next().getKey();
while (tit.hasNext()) {
String tk = tit.next().getKey();
result.addObject().put("source", sk).put("target", tk);
}
}
}
return result;
}

public void execute() throws Exception {
Map<String, DAGNode> items = items(diagram.at("/items"));
Map<String, DAGNode> nodes = configure(relations(items, diagram.at("/relations")));
Map<String, DAGNode> items = items(diagram.at("/nodes"));
JsonNode edges = edges(diagram.at("/nodes"), diagram.at("/edges"));
Map<String, DAGNode> nodes = configure(relations(items, edges));
// 查找入度为零的节点并执行
while(recursive(nodes)) {}
}
Expand Down
4 changes: 4 additions & 0 deletions java/ext/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

subprojects {
archivesBaseName = rootProject.name + '-ext-' + name
}
File renamed without changes.
Empty file.
4 changes: 4 additions & 0 deletions java/ext/flink/conf/flink-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
jobmanager.rpc.address: 127.0.0.1
jobmanager.rpc.port: 6123
rest.address: 127.0.0.1
rest.port: 8081
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.iisquare.fs.plugins.flink;
package com.iisquare.fs.ext.flink;

import com.iisquare.fs.app.flink.core.FlinkRunner;
import com.iisquare.fs.base.dag.core.DAGTransform;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.iisquare.fs.ext.flink.cli;

import com.iisquare.fs.ext.flink.job.CDCJob;
import com.iisquare.fs.ext.flink.util.CLIUtil;
import org.apache.flink.client.cli.CliFrontend;

import java.util.Arrays;
import java.util.List;

public class CDCSubmitter {


public static void main(String[] args) {
System.out.println("Edit Environment FLINK_CONF_DIR=" + CLIUtil.path() + "conf");
List<String> params = Arrays.asList(
"run",
"-c",
CDCJob.class.getName(),
CLIUtil.path() + "build/libs/fs-project-ext-flink-0.0.1-SNAPSHOT.jar"
);
CliFrontend.main(params.toArray(new String[0]));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.iisquare.fs.ext.flink.job;

import com.iisquare.fs.ext.flink.util.CLIUtil;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class CDCJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
env.setStateBackend(new HashMapStateBackend());
CheckpointConfig checkpoint = env.getCheckpointConfig();
checkpoint.setCheckpointStorage("file://" + CLIUtil.path() + "checkpoints");
checkpoint.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpoint.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
PostgreSQLSource.Builder<String> builder = PostgreSQLSource.<String>builder()
.hostname("127.0.0.1").username("postgres").password("admin888")
.database("postgres").schemaList("public").tableList(".*")
.decodingPluginName("pgoutput") // could not access file "decoderbufs": No such file or directory
.deserializer(new JsonDebeziumDeserializationSchema());
builder.debeziumProperties(new Properties(){{
put("snapshot.mode", args[0]);
}});
DataStreamSource<String> source = env.addSource(builder.build());
source.print();
env.execute("cdc-job");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.iisquare.fs.ext.flink.util;

import com.iisquare.fs.ext.flink.job.CDCJob;

public class CLIUtil {

public static String path () {
String path = CDCJob.class.getProtectionDomain().getCodeSource().getLocation().getPath();
int index = path.indexOf("/java/ext/flink/");
if (-1 == index) throw new RuntimeException("path error:" + path);
path = path.substring(0, index) + "/java/ext/flink/";
return path;
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.iisquare.fs.plugins.flink.tester;
package com.iisquare.fs.ext.flink.tester;

import com.iisquare.fs.app.flink.FlinkApplication;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.iisquare.fs.ext.flink.tester;

import com.iisquare.fs.ext.flink.util.CLIUtil;
import com.iisquare.fs.ext.flink.job.CDCJob;
import org.apache.flink.client.cli.CliFrontend;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

public class SubmitterTest {

@Test
public void cdcTest() {
System.out.println("Environment FLINK_CONF_DIR=" + CLIUtil.path() + "conf");
List<String> params = Arrays.asList(
"run",
// "--fromSavepoint",
// "D:\\htdocs\\fs-project-vip\\java\\ext\\flink\\checkpoints\\7eab743ad591b2bbd3f6046384874d7a\\chk-0",
// "--allowNonRestoredState", // 当作业发生变更时,允许保留匹配节点
"-c",
CDCJob.class.getName(),
CLIUtil.path() + "build/libs/fs-project-ext-flink-0.0.1-SNAPSHOT.jar",
"never"
);
CliFrontend.main(params.toArray(new String[0]));
}

}
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.iisquare.fs.plugins.spark;
package com.iisquare.fs.ext.spark;

import com.iisquare.fs.base.dag.core.DAGTransform;

Expand Down
Loading

0 comments on commit d35642e

Please sign in to comment.