From d35642ee827df4858d22f6607e80bb1a31113535 Mon Sep 17 00:00:00 2001 From: iisquare Date: Tue, 14 Jun 2022 15:45:51 +0800 Subject: [PATCH] rewrite flow dag with antv x6 --- .gitignore | 1 + docker/README.md | 2 + docker/docker-compose.yml | 1 + docker/env-example | 4 +- docker/service/fastdfs/Dockerfile | 5 - docker/service/fastdfs/README.md | 4 - docker/service/postgres/Dockerfile | 4 + docker/service/postgres/postgresql.conf | 8 + java/.gitignore | 2 + java/app/flink/build.gradle | 1 + .../fs/app/flink/tester/FlinkTester.java | 38 ++ .../iisquare/fs/base/dag/core/DAGRunner.java | 109 +++++- java/ext/build.gradle | 4 + java/{plugins => ext}/flink/build.gradle | 0 java/ext/flink/checkpoints/.gitkeep | 0 java/ext/flink/conf/flink-conf.yaml | 4 + .../iisquare/fs/ext}/flink/TestTransform.java | 2 +- .../fs/ext/flink/cli/CDCSubmitter.java | 24 ++ .../com/iisquare/fs/ext/flink/job/CDCJob.java | 37 ++ .../iisquare/fs/ext/flink/util/CLIUtil.java | 15 + .../fs/ext}/flink/tester/DAGTester.java | 2 +- .../fs/ext/flink/tester/SubmitterTest.java | 29 ++ java/{plugins => ext}/spark/build.gradle | 0 .../iisquare/fs/ext}/spark/TestTransform.java | 2 +- .../fs/ext}/spark/tester/DAGTester.java | 2 +- java/plugins/build.gradle | 4 - java/settings.gradle | 4 +- .../fs/web/bi/service/DAGService.java | 80 ++-- static/README.md | 29 ++ static/admin/package.json | 3 + .../src/assets/icons/action/action-fit.svg | 6 + static/admin/src/assets/icons/action/index.js | 4 +- .../admin/src/assets/icons/workflow/index.js | 4 +- .../assets/icons/workflow/workflow-group.svg | 16 +- .../icons/workflow/workflow-subprocess.svg | 4 + static/admin/src/components/X6/FlowEdge.js | 15 + static/admin/src/components/X6/FlowGroup.js | 151 +++++++ static/admin/src/components/X6/FlowNode.vue | 66 ++++ .../admin/src/components/X6/FlowSubprocess.js | 108 +++++ static/admin/src/components/X6/flow.js | 370 ++++++++++++++++++ static/admin/src/components/X6/flow.less | 77 ++++ static/admin/src/components/X6/index.js | 73 ++++ .../bi/diagram/design/APIConfigProperty.vue | 41 +- .../design/AnchorTransformProperty.vue | 31 +- .../design/CalendarOffsetConfigProperty.vue | 47 ++- .../bi/diagram/design/CanvasProperty.vue | 29 +- .../bi/diagram/design/ConsoleSinkProperty.vue | 29 +- .../diagram/design/ConsulConfigProperty.vue | 25 +- .../design/ConvertTransformProperty.vue | 31 +- .../design/DateFormatConfigProperty.vue | 41 +- .../design/DateFormatTransformProperty.vue | 41 +- .../design/DateGenerateConfigProperty.vue | 41 +- .../design/DateParseTransformProperty.vue | 41 +- .../bi/diagram/design/DefaultProperty.vue | 21 +- .../views/bi/diagram/design/EdgeProperty.vue | 70 ++++ .../design/ElasticsearchSinkProperty.vue | 61 +-- .../design/ElasticsearchSourceProperty.vue | 47 ++- .../bi/diagram/design/FileSourceProperty.vue | 29 +- .../diagram/design/ImportConfigProperty.vue | 25 +- .../bi/diagram/design/JDBCSourceProperty.vue | 67 ++-- .../bi/diagram/design/JSONConfigProperty.vue | 27 +- .../design/JSONParseTransformProperty.vue | 29 +- .../design/JSONStringifyTransformProperty.vue | 29 +- .../bi/diagram/design/KafkaSourceProperty.vue | 45 ++- .../bi/diagram/design/MergeConfigProperty.vue | 33 +- .../bi/diagram/design/MongoSinkProperty.vue | 53 ++- .../diagram/design/MySQLCaptureProperty.vue | 57 +-- .../design/NumberGenerateConfigProperty.vue | 41 +- .../src/views/bi/diagram/design/Property.vue | 24 +- .../design/RegularTransformProperty.vue | 33 +- .../diagram/design/SQLTransformProperty.vue | 27 +- .../design/ScriptTransformProperty.vue | 29 +- .../views/bi/diagram/design/SliceBasic.vue | 39 +- .../src/views/bi/diagram/design/config.js | 175 +++++++-- static/admin/src/views/bi/diagram/model.vue | 155 +++----- static/admin/vue.config.js | 3 +- static/admin/yarn.lock | 59 +++ 77 files changed, 2302 insertions(+), 587 deletions(-) delete mode 100644 docker/service/fastdfs/Dockerfile delete mode 100644 docker/service/fastdfs/README.md create mode 100644 docker/service/postgres/postgresql.conf create mode 100644 java/ext/build.gradle rename java/{plugins => ext}/flink/build.gradle (100%) create mode 100644 java/ext/flink/checkpoints/.gitkeep create mode 100644 java/ext/flink/conf/flink-conf.yaml rename java/{plugins/flink/src/main/java/com/iisquare/fs/plugins => ext/flink/src/main/java/com/iisquare/fs/ext}/flink/TestTransform.java (91%) create mode 100644 java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/cli/CDCSubmitter.java create mode 100644 java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/job/CDCJob.java create mode 100644 java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/util/CLIUtil.java rename java/{plugins/flink/src/test/java/com/iisquare/fs/plugins => ext/flink/src/test/java/com/iisquare/fs/ext}/flink/tester/DAGTester.java (85%) create mode 100644 java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/SubmitterTest.java rename java/{plugins => ext}/spark/build.gradle (100%) rename java/{plugins/spark/src/main/java/com/iisquare/fs/plugins => ext/spark/src/main/java/com/iisquare/fs/ext}/spark/TestTransform.java (83%) rename java/{plugins/spark/src/test/java/com/iisquare/fs/plugins => ext/spark/src/test/java/com/iisquare/fs/ext}/spark/tester/DAGTester.java (85%) delete mode 100644 java/plugins/build.gradle create mode 100644 static/admin/src/assets/icons/action/action-fit.svg create mode 100644 static/admin/src/assets/icons/workflow/workflow-subprocess.svg create mode 100644 static/admin/src/components/X6/FlowEdge.js create mode 100644 static/admin/src/components/X6/FlowGroup.js create mode 100644 static/admin/src/components/X6/FlowNode.vue create mode 100644 static/admin/src/components/X6/FlowSubprocess.js create mode 100644 static/admin/src/components/X6/flow.js create mode 100644 static/admin/src/components/X6/flow.less create mode 100644 static/admin/src/components/X6/index.js create mode 100644 static/admin/src/views/bi/diagram/design/EdgeProperty.vue diff --git a/.gitignore b/.gitignore index 6e123c11..0a593bcd 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ **/*.log **/__pycache__ **/*.zip +!.gitkeep /docker/.env /docker/runtime diff --git a/docker/README.md b/docker/README.md index f3ac4b3b..5db84083 100644 --- a/docker/README.md +++ b/docker/README.md @@ -96,3 +96,5 @@ rm -rf ./runtime/mongo - [为容器设置启动时要执行的命令和参数](https://kubernetes.io/zh/docs/tasks/inject-data-application/define-command-argument-container/) - [docker-compose建立容器之间的连接关系](https://www.jianshu.com/p/1e80c2866a9d) - [Docker run reference VOLUME (shared filesystems)](https://docs.docker.com/engine/reference/run/#volume-shared-filesystems) +- [Segmentation fault when run old debian containers if docker host is debian10(buster)](https://stackoverflow.com/questions/57807835/segmentation-fault-when-run-old-debian-containers-if-docker-host-is-debian10bus) +- [Enable vsyscall=emulate in the kernel config to run older base images such as Centos 6](https://github.com/microsoft/WSL/issues/4694) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 6c382685..b3cbf82f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -39,6 +39,7 @@ services: - "${BIND_ADDRESS}${BIND_COLON}${POSTGRES_PORT}:5432" volumes: - ${DATA_DIR}/postgres/data:/var/lib/postgresql/data + - ${DATA_DIR}/postgres/archive:/var/lib/postgresql/archive openresty: build: context: ./service/openresty diff --git a/docker/env-example b/docker/env-example index f65c7908..2c853b7d 100644 --- a/docker/env-example +++ b/docker/env-example @@ -1,7 +1,7 @@ DATA_DIR=/data/runtime -BIND_COLON=: -BIND_ADDRESS=127.0.0.1 +BIND_COLON= +BIND_ADDRESS= ZOOKEEPER_VERSION=3.5.8 ZOOKEEPER_CLIENT_PORT=2181 diff --git a/docker/service/fastdfs/Dockerfile b/docker/service/fastdfs/Dockerfile deleted file mode 100644 index ab8d1c98..00000000 --- a/docker/service/fastdfs/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -ARG FASTDFS_VERSION - -FROM season/fastdfs:${FASTDFS_VERSION} - -EXPOSE 22122 diff --git a/docker/service/fastdfs/README.md b/docker/service/fastdfs/README.md deleted file mode 100644 index 2797c940..00000000 --- a/docker/service/fastdfs/README.md +++ /dev/null @@ -1,4 +0,0 @@ - -## 参考 -- [Segmentation fault when run old debian containers if docker host is debian10(buster)](https://stackoverflow.com/questions/57807835/segmentation-fault-when-run-old-debian-containers-if-docker-host-is-debian10bus) -- [Enable vsyscall=emulate in the kernel config to run older base images such as Centos 6](https://github.com/microsoft/WSL/issues/4694) diff --git a/docker/service/postgres/Dockerfile b/docker/service/postgres/Dockerfile index af8522ff..5f42f542 100644 --- a/docker/service/postgres/Dockerfile +++ b/docker/service/postgres/Dockerfile @@ -2,4 +2,8 @@ ARG POSTGRES_VERSION FROM postgres:${POSTGRES_VERSION} +ADD postgresql.conf /etc/postgresql/postgresql.conf + EXPOSE 5432 + +CMD ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"] diff --git a/docker/service/postgres/postgresql.conf b/docker/service/postgres/postgresql.conf new file mode 100644 index 00000000..2c96b98b --- /dev/null +++ b/docker/service/postgres/postgresql.conf @@ -0,0 +1,8 @@ +# docker run -i --rm postgres cat /usr/share/postgresql/postgresql.conf.sample + +listen_addresses = '*' + +wal_level = logical + +archive_mode = on +archive_command = 'cp %p /var/lib/postgresql/archive/%f' diff --git a/java/.gitignore b/java/.gitignore index 6f4688f3..9e456cfb 100644 --- a/java/.gitignore +++ b/java/.gitignore @@ -10,3 +10,5 @@ /web/xlab/libs/x86/ /web/xlab/libs/*.jar /web/file/uploads + +/ext/flink/checkpoints/**/ diff --git a/java/app/flink/build.gradle b/java/app/flink/build.gradle index adc26205..38ac5dba 100644 --- a/java/app/flink/build.gradle +++ b/java/app/flink/build.gradle @@ -12,6 +12,7 @@ dependencies { compile group: 'org.apache.flink', name: 'flink-connector-elasticsearch7_' + scalaVersion, version: flinkVersion compile group: 'com.ververica', name: 'flink-connector-mysql-cdc', version: cdcVersion + compile group: 'com.ververica', name: 'flink-connector-postgres-cdc', version: cdcVersion testCompile group: 'org.apache.flink', name: 'flink-runtime-web_' + scalaVersion, version: flinkVersion } diff --git a/java/app/flink/src/test/java/com/iisquare/fs/app/flink/tester/FlinkTester.java b/java/app/flink/src/test/java/com/iisquare/fs/app/flink/tester/FlinkTester.java index 693a5e15..6fcfc9bb 100644 --- a/java/app/flink/src/test/java/com/iisquare/fs/app/flink/tester/FlinkTester.java +++ b/java/app/flink/src/test/java/com/iisquare/fs/app/flink/tester/FlinkTester.java @@ -4,6 +4,7 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; @@ -67,4 +68,41 @@ public void cdcTest() throws Exception { env.execute("cdc-test"); } + @Test + public void pgTest() throws Exception { + Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT,"8080-8089"); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); + env.enableCheckpointing(1000); + PostgreSQLSource.Builder builder = PostgreSQLSource.builder() + .hostname("127.0.0.1").username("postgres").password("admin888") + .database("postgres").schemaList("public").tableList(".*") + .decodingPluginName("pgoutput") // could not access file "decoderbufs": No such file or directory + .deserializer(new JsonDebeziumDeserializationSchema()); + builder.debeziumProperties(new Properties(){{ + /** + * Postgresql的LSN保存在主数据库中,启用Checkpoint后即可confirm,且不受fromSavepoint影响 + * 清除: select * from pg_drop_replication_slot('flink'); + * 查询: select * from pg_replication_slots; + * @see(https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html) + */ + put("snapshot.mode", "never"); // initial, always, never, initial_only, exported + }}); + DataStreamSource source = env.addSource(builder.build()); + source.print(); + env.execute("pg-test"); + } + + @Test + public void restTest() throws Exception { + Configuration config = new Configuration(); + config.setString(RestOptions.BIND_PORT,"8081"); + config.setInteger("taskmanager.numberOfTaskSlots", 3); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); + // nc -l 8888 + DataStreamSource source = env.socketTextStream("127.0.0.1", 8888); + source.print().setParallelism(1); + env.execute("rest-test"); + } + } diff --git a/java/base/dag/src/main/java/com/iisquare/fs/base/dag/core/DAGRunner.java b/java/base/dag/src/main/java/com/iisquare/fs/base/dag/core/DAGRunner.java index bda1218c..523679cb 100644 --- a/java/base/dag/src/main/java/com/iisquare/fs/base/dag/core/DAGRunner.java +++ b/java/base/dag/src/main/java/com/iisquare/fs/base/dag/core/DAGRunner.java @@ -1,6 +1,9 @@ package com.iisquare.fs.base.dag.core; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.iisquare.fs.base.core.util.DPUtil; import java.io.Serializable; import java.util.*; @@ -25,6 +28,7 @@ public Map items(JsonNode items) throws Exception { while (iterator.hasNext()) { JsonNode item = iterator.next(); String type = item.at("/type").asText(); + if (Arrays.asList("SubprocessLayout").contains(type)) continue; DAGNode node = this.nodes.get(type).newInstance(); node.setId(item.at("/id").asText()); if (node instanceof DAGSource) { @@ -50,8 +54,8 @@ public Map relations(Map items, JsonNode relat Iterator iterator = relations.iterator(); while (iterator.hasNext()) { JsonNode item = iterator.next(); - DAGNode source = items.get(item.get("sourceId").asText()); - DAGNode target = items.get(item.get("targetId").asText()); + DAGNode source = items.get(item.get("source").asText()); + DAGNode target = items.get(item.get("target").asText()); source.getTargets().add(target); target.getSources().add(source); } @@ -128,9 +132,106 @@ private boolean recursive(Map nodeMap) throws Exception { return !list.isEmpty(); } + public JsonNode edges(JsonNode nodes, JsonNode edges) { + /** + * 查找所有子流程 + * { + * processId: { + * children: { + * childId: {} + * }, + * incoming: { // edge.target + * childId: {} + * }, + * outgoing: { // edge.source + * childId: {} + * } + * } + * } + */ + ObjectNode processes = DPUtil.objectNode(); + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + JsonNode node = iterator.next(); + if (!"SubprocessLayout".equals(node.at("/type").asText())) continue; + ObjectNode process = processes.putObject(node.at("/id").asText()); + process.putObject("children"); // 全部子节点 + process.putObject("incoming"); // 入度为零的节点 + process.putObject("outgoing"); // 出度为零的节点 + } + /** + * 查找子流程内的节点 + */ + iterator = nodes.iterator(); + while (iterator.hasNext()) { + JsonNode node = iterator.next(); + String parent = node.at("/parent").asText(); + if (!processes.has(parent)) continue; + ObjectNode children = (ObjectNode) processes.at("/" + parent + "/children"); + children.putObject(node.at("/id").asText()); + } + /** + * 出入节点索引 + * sources -> { + * sourceId: [{}] + * } + * targets -> { + * targetId: [{}] + * } + */ + ObjectNode sources = DPUtil.objectNode(); + ObjectNode targets = DPUtil.objectNode(); + iterator = edges.iterator(); + while (iterator.hasNext()) { + JsonNode edge = iterator.next(); + String source = edge.at("/source").asText(); + String target = edge.at("/target").asText(); + (sources.has(source) ? (ArrayNode) sources.get(source) : sources.putArray(source)).add(edge); + (targets.has(target) ? (ArrayNode) targets.get(target) : targets.putArray(target)).add(edge); + } + /** + * 查找子流程内出入度为零的节点 + */ + iterator = processes.iterator(); + while (iterator.hasNext()) { + JsonNode process = iterator.next(); + ObjectNode incoming = (ObjectNode) process.at("/incoming"); + ObjectNode outgoing = (ObjectNode) process.at("/outgoing"); + Iterator> it = process.at("/children").fields(); + while (it.hasNext()) { + String key = it.next().getKey(); + if (!sources.has(key)) outgoing.putObject(key); + if (!targets.has(key)) incoming.putObject(key); + } + } + /** + * 展开子流程节点 + */ + ArrayNode result = DPUtil.arrayNode(); + iterator = edges.iterator(); + while (iterator.hasNext()) { + JsonNode edge = iterator.next(); + String source = edge.at("/source").asText(); + String target = edge.at("/target").asText(); + Iterator> sit = null; // 来源 -> processes[processId].outgoing + Iterator> tit = null; // 目标 -> processes[processId].incoming + sit = (processes.has(source) ? processes.get(source).get("outgoing") : DPUtil.objectNode().put(source, "")).fields(); + tit = (processes.has(target) ? processes.get(target).get("incoming") : DPUtil.objectNode().put(target, "")).fields(); + while (sit.hasNext()) { + String sk = sit.next().getKey(); + while (tit.hasNext()) { + String tk = tit.next().getKey(); + result.addObject().put("source", sk).put("target", tk); + } + } + } + return result; + } + public void execute() throws Exception { - Map items = items(diagram.at("/items")); - Map nodes = configure(relations(items, diagram.at("/relations"))); + Map items = items(diagram.at("/nodes")); + JsonNode edges = edges(diagram.at("/nodes"), diagram.at("/edges")); + Map nodes = configure(relations(items, edges)); // 查找入度为零的节点并执行 while(recursive(nodes)) {} } diff --git a/java/ext/build.gradle b/java/ext/build.gradle new file mode 100644 index 00000000..7225b68e --- /dev/null +++ b/java/ext/build.gradle @@ -0,0 +1,4 @@ + +subprojects { + archivesBaseName = rootProject.name + '-ext-' + name +} diff --git a/java/plugins/flink/build.gradle b/java/ext/flink/build.gradle similarity index 100% rename from java/plugins/flink/build.gradle rename to java/ext/flink/build.gradle diff --git a/java/ext/flink/checkpoints/.gitkeep b/java/ext/flink/checkpoints/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/java/ext/flink/conf/flink-conf.yaml b/java/ext/flink/conf/flink-conf.yaml new file mode 100644 index 00000000..63ce09f9 --- /dev/null +++ b/java/ext/flink/conf/flink-conf.yaml @@ -0,0 +1,4 @@ +jobmanager.rpc.address: 127.0.0.1 +jobmanager.rpc.port: 6123 +rest.address: 127.0.0.1 +rest.port: 8081 diff --git a/java/plugins/flink/src/main/java/com/iisquare/fs/plugins/flink/TestTransform.java b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/TestTransform.java similarity index 91% rename from java/plugins/flink/src/main/java/com/iisquare/fs/plugins/flink/TestTransform.java rename to java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/TestTransform.java index c5e09327..2c2dca0d 100644 --- a/java/plugins/flink/src/main/java/com/iisquare/fs/plugins/flink/TestTransform.java +++ b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/TestTransform.java @@ -1,4 +1,4 @@ -package com.iisquare.fs.plugins.flink; +package com.iisquare.fs.ext.flink; import com.iisquare.fs.app.flink.core.FlinkRunner; import com.iisquare.fs.base.dag.core.DAGTransform; diff --git a/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/cli/CDCSubmitter.java b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/cli/CDCSubmitter.java new file mode 100644 index 00000000..b7d0af5d --- /dev/null +++ b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/cli/CDCSubmitter.java @@ -0,0 +1,24 @@ +package com.iisquare.fs.ext.flink.cli; + +import com.iisquare.fs.ext.flink.job.CDCJob; +import com.iisquare.fs.ext.flink.util.CLIUtil; +import org.apache.flink.client.cli.CliFrontend; + +import java.util.Arrays; +import java.util.List; + +public class CDCSubmitter { + + + public static void main(String[] args) { + System.out.println("Edit Environment FLINK_CONF_DIR=" + CLIUtil.path() + "conf"); + List params = Arrays.asList( + "run", + "-c", + CDCJob.class.getName(), + CLIUtil.path() + "build/libs/fs-project-ext-flink-0.0.1-SNAPSHOT.jar" + ); + CliFrontend.main(params.toArray(new String[0])); + } + +} diff --git a/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/job/CDCJob.java b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/job/CDCJob.java new file mode 100644 index 00000000..ef7d6ce6 --- /dev/null +++ b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/job/CDCJob.java @@ -0,0 +1,37 @@ +package com.iisquare.fs.ext.flink.job; + +import com.iisquare.fs.ext.flink.util.CLIUtil; +import com.ververica.cdc.connectors.postgres.PostgreSQLSource; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.Properties; + +public class CDCJob { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10000); + env.setStateBackend(new HashMapStateBackend()); + CheckpointConfig checkpoint = env.getCheckpointConfig(); + checkpoint.setCheckpointStorage("file://" + CLIUtil.path() + "checkpoints"); + checkpoint.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + checkpoint.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + PostgreSQLSource.Builder builder = PostgreSQLSource.builder() + .hostname("127.0.0.1").username("postgres").password("admin888") + .database("postgres").schemaList("public").tableList(".*") + .decodingPluginName("pgoutput") // could not access file "decoderbufs": No such file or directory + .deserializer(new JsonDebeziumDeserializationSchema()); + builder.debeziumProperties(new Properties(){{ + put("snapshot.mode", args[0]); + }}); + DataStreamSource source = env.addSource(builder.build()); + source.print(); + env.execute("cdc-job"); + } + +} diff --git a/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/util/CLIUtil.java b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/util/CLIUtil.java new file mode 100644 index 00000000..ccf0b8dd --- /dev/null +++ b/java/ext/flink/src/main/java/com/iisquare/fs/ext/flink/util/CLIUtil.java @@ -0,0 +1,15 @@ +package com.iisquare.fs.ext.flink.util; + +import com.iisquare.fs.ext.flink.job.CDCJob; + +public class CLIUtil { + + public static String path () { + String path = CDCJob.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + int index = path.indexOf("/java/ext/flink/"); + if (-1 == index) throw new RuntimeException("path error:" + path); + path = path.substring(0, index) + "/java/ext/flink/"; + return path; + } + +} diff --git a/java/plugins/flink/src/test/java/com/iisquare/fs/plugins/flink/tester/DAGTester.java b/java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/DAGTester.java similarity index 85% rename from java/plugins/flink/src/test/java/com/iisquare/fs/plugins/flink/tester/DAGTester.java rename to java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/DAGTester.java index 72170711..618bdf39 100644 --- a/java/plugins/flink/src/test/java/com/iisquare/fs/plugins/flink/tester/DAGTester.java +++ b/java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/DAGTester.java @@ -1,4 +1,4 @@ -package com.iisquare.fs.plugins.flink.tester; +package com.iisquare.fs.ext.flink.tester; import com.iisquare.fs.app.flink.FlinkApplication; import org.junit.Test; diff --git a/java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/SubmitterTest.java b/java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/SubmitterTest.java new file mode 100644 index 00000000..d53b5383 --- /dev/null +++ b/java/ext/flink/src/test/java/com/iisquare/fs/ext/flink/tester/SubmitterTest.java @@ -0,0 +1,29 @@ +package com.iisquare.fs.ext.flink.tester; + +import com.iisquare.fs.ext.flink.util.CLIUtil; +import com.iisquare.fs.ext.flink.job.CDCJob; +import org.apache.flink.client.cli.CliFrontend; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class SubmitterTest { + + @Test + public void cdcTest() { + System.out.println("Environment FLINK_CONF_DIR=" + CLIUtil.path() + "conf"); + List params = Arrays.asList( + "run", +// "--fromSavepoint", +// "D:\\htdocs\\fs-project-vip\\java\\ext\\flink\\checkpoints\\7eab743ad591b2bbd3f6046384874d7a\\chk-0", +// "--allowNonRestoredState", // 当作业发生变更时,允许保留匹配节点 + "-c", + CDCJob.class.getName(), + CLIUtil.path() + "build/libs/fs-project-ext-flink-0.0.1-SNAPSHOT.jar", + "never" + ); + CliFrontend.main(params.toArray(new String[0])); + } + +} diff --git a/java/plugins/spark/build.gradle b/java/ext/spark/build.gradle similarity index 100% rename from java/plugins/spark/build.gradle rename to java/ext/spark/build.gradle diff --git a/java/plugins/spark/src/main/java/com/iisquare/fs/plugins/spark/TestTransform.java b/java/ext/spark/src/main/java/com/iisquare/fs/ext/spark/TestTransform.java similarity index 83% rename from java/plugins/spark/src/main/java/com/iisquare/fs/plugins/spark/TestTransform.java rename to java/ext/spark/src/main/java/com/iisquare/fs/ext/spark/TestTransform.java index db3a1e20..505c7229 100644 --- a/java/plugins/spark/src/main/java/com/iisquare/fs/plugins/spark/TestTransform.java +++ b/java/ext/spark/src/main/java/com/iisquare/fs/ext/spark/TestTransform.java @@ -1,4 +1,4 @@ -package com.iisquare.fs.plugins.spark; +package com.iisquare.fs.ext.spark; import com.iisquare.fs.base.dag.core.DAGTransform; diff --git a/java/plugins/spark/src/test/java/com/iisquare/fs/plugins/spark/tester/DAGTester.java b/java/ext/spark/src/test/java/com/iisquare/fs/ext/spark/tester/DAGTester.java similarity index 85% rename from java/plugins/spark/src/test/java/com/iisquare/fs/plugins/spark/tester/DAGTester.java rename to java/ext/spark/src/test/java/com/iisquare/fs/ext/spark/tester/DAGTester.java index 1611aa84..691d155b 100644 --- a/java/plugins/spark/src/test/java/com/iisquare/fs/plugins/spark/tester/DAGTester.java +++ b/java/ext/spark/src/test/java/com/iisquare/fs/ext/spark/tester/DAGTester.java @@ -1,4 +1,4 @@ -package com.iisquare.fs.plugins.spark.tester; +package com.iisquare.fs.ext.spark.tester; import com.iisquare.fs.app.spark.SparkApplication; import org.junit.Test; diff --git a/java/plugins/build.gradle b/java/plugins/build.gradle deleted file mode 100644 index 70c7a871..00000000 --- a/java/plugins/build.gradle +++ /dev/null @@ -1,4 +0,0 @@ - -subprojects { - archivesBaseName = rootProject.name + '-plugins-' + name -} diff --git a/java/settings.gradle b/java/settings.gradle index e4dd4dde..a3074df9 100644 --- a/java/settings.gradle +++ b/java/settings.gradle @@ -17,8 +17,8 @@ include 'app:crawler' include 'app:spark' include 'app:flink' -include 'plugins:flink' -include 'plugins:spark' +include 'ext:flink' +include 'ext:spark' include 'web:admin' include 'web:core' diff --git a/java/web/bi/src/main/java/com/iisquare/fs/web/bi/service/DAGService.java b/java/web/bi/src/main/java/com/iisquare/fs/web/bi/service/DAGService.java index 0cad55e2..a10ba169 100644 --- a/java/web/bi/src/main/java/com/iisquare/fs/web/bi/service/DAGService.java +++ b/java/web/bi/src/main/java/com/iisquare/fs/web/bi/service/DAGService.java @@ -30,22 +30,43 @@ public ObjectNode diagram(Integer id, ObjectNode cached) { if (null != cached) cached.replace(sid, null); return null; } - ObjectNode node = DPUtil.objectNode(); - node.put("id", diagram.getId()).put("name", diagram.getName()); - node.put("engine", diagram.getEngine()).put("model", diagram.getModel()); + ObjectNode result = DPUtil.objectNode(); + result.put("id", diagram.getId()).put("name", diagram.getName()); + result.put("engine", diagram.getEngine()).put("model", diagram.getModel()); JsonNode content = DPUtil.parseJSON(diagram.getContent()); if (null == content || !content.isObject()) content = DPUtil.objectNode(); - ArrayNode items = node.putArray("items"); + ArrayNode nodes = result.putArray("nodes"); + ArrayNode edges = result.putArray("edges"); List exportIds = new ArrayList<>(); ObjectNode importIds = DPUtil.objectNode(); ArrayNode jars = DPUtil.arrayNode(); - Iterator iterator = content.at("/items").iterator(); + ArrayNode exports = DPUtil.arrayNode(); + Iterator iterator = content.at("/cells").iterator(); while (iterator.hasNext()) { JsonNode json = iterator.next(); + String shape = json.at("/shape").asText(""); + if ("flow-edge".equals(shape)) { + String source = "dag_" + diagram.getId() + "_" + json.at("/source/cell").asText(); + String target = "dag_" + diagram.getId() + "_" + json.at("/target/cell").asText(); + if (exportIds.contains(target)) { + exports.add(target); + continue; + } + if (importIds.has(source)) { + Iterator it = importIds.get(source).iterator(); + while (it.hasNext()) { + edges.addObject().put("source", it.next().asText()).put("target", target); + } + continue; + } + edges.addObject().put("source", source).put("target", target); + continue; + } String nodeId = "dag_" + diagram.getId() + "_" + json.at("/id").asText(); - String type = json.at("/type").asText(); - JsonNode options = json.at("/options"); + String type = json.at("/data/type").asText(); + JsonNode options = json.at("/data/options"); if (!options.isObject()) options = DPUtil.objectNode(); + if ("GroupLayout".equals(type)) continue; if ("ExportConfig".equals(type)) { exportIds.add(nodeId); continue; @@ -53,48 +74,31 @@ public ObjectNode diagram(Integer id, ObjectNode cached) { if ("ImportConfig".equals(type)) { ObjectNode sub = diagram(DPUtil.parseInt(options.at("/id").asText()), cached); importIds.replace(nodeId, null == sub ? DPUtil.arrayNode() : sub.at("/exports")); - items.addAll((ArrayNode) sub.at("/items")); + nodes.addAll((ArrayNode) sub.at("/nodes")); + jars.addAll((ArrayNode) sub.at("/jars")); continue; } if ("ScriptTransform".equals(type)) { String jar = options.at("/jarURI").asText(); if (!DPUtil.empty(jar)) jars.add(jar); } - ObjectNode item = items.addObject(); - item.put("id", nodeId); - item.put("type", type); + ObjectNode node = nodes.addObject(); + node.put("id", nodeId); + node.put("type", type); + String parent = json.at("/parent").asText(); + node.put("parent", DPUtil.empty(parent) ? "" : ("dag_" + diagram.getId() + "_" + parent)); if (type.endsWith("Source") || type.endsWith("Transform")) { - item.put("alias", json.at("/alias").asText()); + node.put("alias", json.at("/data/alias").asText()); } if (type.endsWith("Source") || type.endsWith("Transform") || type.endsWith("Sink")) { - item.put("kvConfigPrefix", json.at("/kvConfigPrefix").asText()); - } - item.replace("options", options); - } - ArrayNode relations = node.putArray("relations"); - ArrayNode exports = DPUtil.arrayNode(); - iterator = content.at("/relations").iterator(); - while (iterator.hasNext()) { - JsonNode json = iterator.next(); - String sourceId = "dag_" + diagram.getId() + "_" + json.at("/sourceId").asText(); - String targetId = "dag_" + diagram.getId() + "_" + json.at("/targetId").asText(); - if (exportIds.contains(targetId)) { - exports.add(sourceId); - continue; - } - if (importIds.has(sourceId)) { - Iterator it = importIds.get(sourceId).iterator(); - while (it.hasNext()) { - relations.addObject().put("sourceId", it.next().asText()).put("targetId", targetId); - } - continue; + node.put("kvConfigPrefix", json.at("/data/kvConfigPrefix").asText()); } - relations.addObject().put("sourceId", sourceId).put("targetId", targetId); + node.replace("options", options); } - node.replace("exports", exports); - node.replace("jars", jars); - if (null == cached) cached.replace(sid, node); - return node; + result.replace("exports", exports); + result.replace("jars", jars); + if (null == cached) cached.replace(sid, result); + return result; } } diff --git a/static/README.md b/static/README.md index d99a036c..fb8a22a7 100644 --- a/static/README.md +++ b/static/README.md @@ -31,3 +31,32 @@ yarn upgrade [package]@[version] yarn upgrade –latest [package] yarn remove ``` + +## 反向代理 + +### Nginx +``` +location /uri/ { + # /uri/index.html -> /path/to/static/uri/index.html + # root /path/to/static/; + + # /uri/index.html -> /path/to/static/index.html + # alias /path/to/static/; + + # gzip on; + + # ETag: "文件大小的十六进制"; Last-Modified: UTC DateTime; Status Code: 304 OK + # etag on; + + # exact:完全符合; before:响应的修改时间小于或等于请求头中的 “If-Modified-Since” 字段的时间 + # if_modified_since off | exact | before; + + # 在缓存期内不会请求服务端,更不会触发ETag判断。Status Code: 200 OK (from disk/memory cache) + # expires 30d; + + # autoindex on; + # autoindex_localtime on; + # autoindex_exact_size off; + +} +``` diff --git a/static/admin/package.json b/static/admin/package.json index d25c04ca..64d52615 100644 --- a/static/admin/package.json +++ b/static/admin/package.json @@ -14,6 +14,8 @@ }, "dependencies": { "@antv/data-set": "^0.10.2", + "@antv/x6": "^1.32.3", + "@antv/x6-vue-shape": "^1.4.0", "@toast-ui/editor": "3.1.2", "@toast-ui/editor-plugin-code-syntax-highlight": "^3.0.0", "@toast-ui/editor-plugin-table-merged-cell": "3.0.1", @@ -55,6 +57,7 @@ "@vue/cli-plugin-unit-jest": "^4.0.4", "@vue/cli-plugin-vuex": "^4.0.4", "@vue/cli-service": "^4.0.4", + "@vue/composition-api": "^1.6.2", "@vue/eslint-config-standard": "^4.0.0", "@vue/test-utils": "^1.0.0-beta.29", "babel-eslint": "^10.0.1", diff --git a/static/admin/src/assets/icons/action/action-fit.svg b/static/admin/src/assets/icons/action/action-fit.svg new file mode 100644 index 00000000..8d25ee35 --- /dev/null +++ b/static/admin/src/assets/icons/action/action-fit.svg @@ -0,0 +1,6 @@ + + + + + + diff --git a/static/admin/src/assets/icons/action/index.js b/static/admin/src/assets/icons/action/index.js index c479569f..741ba1af 100644 --- a/static/admin/src/assets/icons/action/index.js +++ b/static/admin/src/assets/icons/action/index.js @@ -5,6 +5,7 @@ import actionHand from '@/assets/icons/action/action-hand.svg?inline' import actionLasso from '@/assets/icons/action/action-lasso.svg?inline' import actionConnection from '@/assets/icons/action/action-connection.svg?inline' import actionSpace from '@/assets/icons/action/action-space.svg?inline' +import actionFit from '@/assets/icons/action/action-fit.svg?inline' export default { actionAlignHorizontal, @@ -13,5 +14,6 @@ export default { actionHand, actionLasso, actionConnection, - actionSpace + actionSpace, + actionFit } diff --git a/static/admin/src/assets/icons/workflow/index.js b/static/admin/src/assets/icons/workflow/index.js index 7993a3be..03ec902b 100644 --- a/static/admin/src/assets/icons/workflow/index.js +++ b/static/admin/src/assets/icons/workflow/index.js @@ -6,6 +6,7 @@ import workflowParallelGateway from '@/assets/icons/workflow/workflow-parallel-g import workflowInclusiveGateway from '@/assets/icons/workflow/workflow-inclusive-gateway.svg?inline' import workflowPool from '@/assets/icons/workflow/workflow-pool.svg?inline' import workflowGroup from '@/assets/icons/workflow/workflow-group.svg?inline' +import workflowSubprocess from '@/assets/icons/workflow/workflow-subprocess.svg?inline' export default { workflowStartEvent, @@ -15,5 +16,6 @@ export default { workflowParallelGateway, workflowInclusiveGateway, workflowPool, - workflowGroup + workflowGroup, + workflowSubprocess } diff --git a/static/admin/src/assets/icons/workflow/workflow-group.svg b/static/admin/src/assets/icons/workflow/workflow-group.svg index ed9ce0bd..81021b70 100644 --- a/static/admin/src/assets/icons/workflow/workflow-group.svg +++ b/static/admin/src/assets/icons/workflow/workflow-group.svg @@ -1 +1,15 @@ - \ No newline at end of file + + + + 1198 + + + + + + + + + + + \ No newline at end of file diff --git a/static/admin/src/assets/icons/workflow/workflow-subprocess.svg b/static/admin/src/assets/icons/workflow/workflow-subprocess.svg new file mode 100644 index 00000000..b074233a --- /dev/null +++ b/static/admin/src/assets/icons/workflow/workflow-subprocess.svg @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/static/admin/src/components/X6/FlowEdge.js b/static/admin/src/components/X6/FlowEdge.js new file mode 100644 index 00000000..7192e391 --- /dev/null +++ b/static/admin/src/components/X6/FlowEdge.js @@ -0,0 +1,15 @@ +import { Shape } from '@antv/x6' + +export default class FlowEdge extends Shape.Edge { +} + +FlowEdge.config({ + attrs: { + line: { + stroke: '#A2B1C3', + strokeWidth: 2, + targetMarker: { name: 'block', width: 12, height: 8 } + } + }, + zIndex: 0 +}) diff --git a/static/admin/src/components/X6/FlowGroup.js b/static/admin/src/components/X6/FlowGroup.js new file mode 100644 index 00000000..aa4ed0e7 --- /dev/null +++ b/static/admin/src/components/X6/FlowGroup.js @@ -0,0 +1,151 @@ +import { Node } from '@antv/x6' + +export default class FlowGroup extends Node { + constructor (meta) { + super(meta) + this.meta = meta + this.collapsed = false + this.attr('label/text', this.meta.data.title) + } + + postprocess () { + this.on('change:data', ({ current }) => { + Object.assign(this.meta, { data: current }) + this.attr('label/text', this.meta.data.title) + }) + this.toggleCollapse(false) + } + + isCollapsed () { + return this.collapsed + } + + addTransientEdge (graph) { + const node = this + const incoming = graph.getConnectedEdges(node, { deep: true, incoming: true }) + const outgoing = graph.getConnectedEdges(node, { deep: true, outgoing: true }) + incoming.forEach(item => { + if (!item.source.cell || !item.source.port) return + graph.addEdge({ + attrs: { + line: { + stroke: '#A2B1C3', + strokeWidth: 2, + targetMarker: { name: 'block', width: 12, height: 8 } + } + }, + source: { cell: item.source.cell, port: item.source.port }, + target: node + }) + }) + outgoing.forEach(item => { + if (!item.target.cell || !item.target.port) return + graph.addEdge({ + attrs: { + line: { + stroke: '#A2B1C3', + strokeWidth: 2, + targetMarker: { name: 'block', width: 12, height: 8 } + } + }, + source: node, + target: { cell: item.target.cell, port: item.target.port } + }) + }) + } + + removeTransientEdge (graph) { + const node = this + const edges = graph.getConnectedEdges(node) + edges.forEach(edge => { + graph.removeEdge(edge) + }) + } + + toggleCollapse (collapsed = null) { + const target = collapsed === null ? !this.collapsed : collapsed + if (target) { + this.attr('buttonSign', { d: 'M 1 5 9 5 M 5 1 5 9' }) + Object.assign(this.meta, this.getSize()) + this.resize(150, 32) + } else { + this.attr('buttonSign', { d: 'M 2 5 8 5' }) + if (this.meta) { + this.resize(this.meta.width, this.meta.height) + } + } + this.collapsed = target + } +} + +FlowGroup.config({ + markup: [ + { + tagName: 'rect', + selector: 'body' + }, + { + tagName: 'text', + selector: 'label' + }, + { + tagName: 'g', + selector: 'buttonGroup', + children: [ + { + tagName: 'rect', + selector: 'button', + attrs: { + 'pointer-events': 'visiblePainted' + } + }, + { + tagName: 'path', + selector: 'buttonSign', + attrs: { + fill: 'none', + 'pointer-events': 'none' + } + } + ] + } + ], + attrs: { + body: { + rx: 10, + ry: 10, + refWidth: '100%', + refHeight: '100%', + stroke: 'rgb(34, 36, 42)', + strokeWidth: '1px', + fill: '#ffffff', + fillOpacity: 0.3, + strokeDasharray: '8, 3, 1, 3' + }, + buttonGroup: { + refX: 8, + refY: 8 + }, + button: { + height: 14, + width: 16, + rx: 2, + ry: 2, + fill: '#f5f5f5', + stroke: '#ccc', + cursor: 'pointer', + event: 'node:collapse' + }, + buttonSign: { + refX: 3, + refY: 2, + stroke: '#808080' + }, + label: { + fontSzie: 12, + refX: '50%', + refY: 10, + textAnchor: 'middle' + } + } +}) diff --git a/static/admin/src/components/X6/FlowNode.vue b/static/admin/src/components/X6/FlowNode.vue new file mode 100644 index 00000000..0f55279b --- /dev/null +++ b/static/admin/src/components/X6/FlowNode.vue @@ -0,0 +1,66 @@ + + + + + diff --git a/static/admin/src/components/X6/FlowSubprocess.js b/static/admin/src/components/X6/FlowSubprocess.js new file mode 100644 index 00000000..b21f97ee --- /dev/null +++ b/static/admin/src/components/X6/FlowSubprocess.js @@ -0,0 +1,108 @@ +import { Node } from '@antv/x6' + +export default class FlowSubprocess extends Node { + constructor (meta) { + super(meta) + this.meta = meta + this.collapsed = false + this.attr('label/text', this.meta.data.title) + } + + postprocess () { + this.on('change:data', ({ current }) => { + Object.assign(this.meta, { data: current }) + this.attr('label/text', this.meta.data.title) + }) + this.toggleCollapse(false) + } + + isCollapsed () { + return this.collapsed + } + + toggleCollapse (collapsed = null) { + const target = collapsed === null ? !this.collapsed : collapsed + if (target) { + this.attr('buttonSign', { d: 'M 1 5 9 5 M 5 1 5 9' }) + Object.assign(this.meta, this.getSize()) + this.resize(150, 32) + } else { + this.attr('buttonSign', { d: 'M 2 5 8 5' }) + if (this.meta) { + this.resize(this.meta.width, this.meta.height) + } + } + this.collapsed = target + } +} + +FlowSubprocess.config({ + markup: [ + { + tagName: 'rect', + selector: 'body' + }, + { + tagName: 'text', + selector: 'label' + }, + { + tagName: 'g', + selector: 'buttonGroup', + children: [ + { + tagName: 'rect', + selector: 'button', + attrs: { + 'pointer-events': 'visiblePainted' + } + }, + { + tagName: 'path', + selector: 'buttonSign', + attrs: { + fill: 'none', + 'pointer-events': 'none' + } + } + ] + } + ], + attrs: { + body: { + rx: 10, + ry: 10, + refWidth: '100%', + refHeight: '100%', + stroke: 'rgb(34, 36, 42)', + strokeWidth: '1px', + fill: '#ffffff', + fillOpacity: 0.3 + }, + buttonGroup: { + refX: 8, + refY: 8 + }, + button: { + height: 14, + width: 16, + rx: 2, + ry: 2, + fill: '#f5f5f5', + stroke: '#ccc', + cursor: 'pointer', + event: 'node:collapse' + }, + buttonSign: { + refX: 3, + refY: 2, + stroke: '#808080' + }, + label: { + fontSzie: 12, + refX: '50%', + refY: 10, + textAnchor: 'middle' + } + } +}) diff --git a/static/admin/src/components/X6/flow.js b/static/admin/src/components/X6/flow.js new file mode 100644 index 00000000..7f68b750 --- /dev/null +++ b/static/admin/src/components/X6/flow.js @@ -0,0 +1,370 @@ +import * as X6 from './index' + +class Flow { + constructor (container, options) { + const _this = this + this.counter = 0 + this.container = container + this.shapeSizes = { + 'flow-node': { + width: 150, + height: 40, + offsetX: 60, + offsetY: 18 + }, + 'flow-group': { + width: 300, + height: 300, + offsetX: 150, + offsetY: 150 + }, + 'flow-subprocess': { + width: 300, + height: 300, + offsetX: 150, + offsetY: 150 + } + } + this.options = Object.assign({ + onCellClick ({ e, x, y, cell, view }) {}, + onCellDoubleClick ({ e, x, y, cell, view }) {}, + onCellContextmenu ({ e, x, y, cell, view }) {}, + onBlankClick ({ e, x, y }) {}, + onBlankDoubleClick ({ e, x, y }) {}, + onBlankContextmenu ({ e, x, y }) {}, + onNodeAdded ({ node, index, options }) {}, + onEdgeConnected () {} + }, options) + this.graph = new X6.Graph({ + container: this.container, + grid: true, + mousewheel: { + enabled: true, + zoomAtMousePosition: true, + modifiers: 'ctrl', + minScale: 0.5, + maxScale: 3 + }, + connecting: { + router: { name: 'manhattan', args: { padding: 1 } }, + connector: { name: 'rounded', args: { radius: 8 } }, + allowBlank: false, + allowMulti: false, + allowPort: true, + allowEdge: false, + allowNode: false, + allowLoop: false, + snap: { radius: 20 }, + createEdge () { + return new X6.FlowEdge() + }, + validateConnection ({ targetMagnet, sourceCell, targetCell }) { + if (!targetMagnet) return false + return _this.subprocess(sourceCell) === _this.subprocess(targetCell) + } + }, + highlighting: { + magnetAdsorbed: { name: 'stroke', args: { attrs: { fill: '#5F95FF', stroke: '#5F95FF' } } } + }, + panning: true, + resizing: { enabled: true, minWidth: 100, minHeight: 40 }, + rotating: true, + selecting: { enabled: true, rubberband: false, showNodeSelectionBox: true }, + snapline: true, + clipboard: true, + history: { + enabled: true + }, + keyboard: { + enabled: true + }, + embedding: { + enabled: true, + findParent ({ node }) { + const bbox = node.getBBox() + return this.getNodes().filter((node) => { + if (!(node instanceof X6.FlowGroup) && !(node instanceof X6.FlowSubprocess)) return false + const targetBBox = node.getBBox() + return bbox.isIntersectWithRect(targetBBox) + }) + } + } + }) + this.dnd = new X6.Addon.Dnd({ target: this.graph }) + this.__bindEvent() + } + + __bindEvent () { + this.graph.bindKey(['meta+c', 'ctrl+c'], () => { // 复制 + const cells = this.graph.getSelectedCells() + if (cells.length) { this.graph.copy(cells) } + return false + }) + this.graph.bindKey(['meta+x', 'ctrl+x'], () => { // 剪切 + const cells = this.graph.getSelectedCells() + if (cells.length) { this.graph.cut(cells) } + return false + }) + this.graph.bindKey(['meta+v', 'ctrl+v'], () => { // 粘贴 + if (!this.graph.isClipboardEmpty()) { + const cells = this.graph.paste({ offset: 32 }) + this.graph.cleanSelection() + this.graph.select(cells) + } + return false + }) + this.graph.bindKey(['meta+z', 'ctrl+z'], () => { // 撤销 + if (this.graph.history.canUndo()) { + this.graph.history.undo() + } + return false + }) + this.graph.bindKey(['meta+shift+z', 'ctrl+shift+z'], () => { // 重做 + if (this.graph.history.canRedo()) { + this.graph.history.redo() + } + return false + }) + this.graph.bindKey(['meta+a', 'ctrl+a'], () => { // 全选 + const nodes = this.graph.getNodes() + if (nodes) { this.graph.select(nodes) } + return false + }) + this.graph.bindKey('delete', () => { // 删除 + const cells = this.graph.getSelectedCells() + if (cells.length) { this.graph.removeCells(cells) } + }) + this.graph.bindKey(['ctrl+1', 'meta+1'], () => { // 放大 + const zoom = this.graph.zoom() + if (zoom < 1.5) { this.graph.zoom(0.1) } + }) + this.graph.bindKey(['ctrl+2', 'meta+2'], () => { // 缩小 + const zoom = this.graph.zoom() + if (zoom > 0.5) { this.graph.zoom(-0.1) } + }) + this.graph.on('node:mouseenter', () => { + const ports = this.container.querySelectorAll('.x6-port-body') + this.showPorts(ports, true) + }) + this.graph.on('node:mouseleave', () => { + const ports = this.container.querySelectorAll('.x6-port-body') + this.showPorts(ports, false) + }) + this.graph.on('node:collapse', ({ e, node }) => { + e.stopPropagation() + node.toggleCollapse() + const collapsed = node.isCollapsed() + if (node instanceof X6.FlowGroup) { + collapsed ? node.addTransientEdge(this.graph) : node.removeTransientEdge(this.graph) + } + const collapse = (parent) => { + const cells = parent.getChildren() + if (cells) { + cells.forEach((cell) => { + collapsed ? cell.hide() : cell.show() + if (cell instanceof X6.FlowGroup || cell instanceof X6.FlowSubprocess) { + if (!cell.isCollapsed()) collapse(cell) + } + }) + } + } + collapse(node) + }) + this.graph.on('edge:mouseenter', ({ cell }) => { + cell.addTools([ + { name: 'circle-source-arrowhead' }, + { name: 'circle-target-arrowhead' } + ]) + }) + this.graph.on('edge:mouseleave', ({ cell }) => { + cell.removeTools() + }) + this.graph.on('edge:connected', (data) => { + data.isNew && data.edge.setData({ title: '', description: '', options: {} }) + this.options.onEdgeConnected(data) + }) + this.graph.on('node:added', (data) => { + this.options.onNodeAdded(data) + }) + this.graph.on('node:embedded', ({ node, currentParent }) => { + if (currentParent) node.setZIndex(currentParent.getZIndex() + 1) + }) + this.graph.on('cell:click', data => { + if (!data.cell.getData()) return true + if (this.options.onCellClick(data)) data.e.stopPropagation() + }) + this.graph.on('cell:dblclick', data => { + if (this.options.onCellDoubleClick(data)) data.e.stopPropagation() + }) + this.graph.on('cell:contextmenu', data => { + if (this.options.onCellContextmenu(data)) data.e.stopPropagation() + }) + this.graph.on('blank:click', data => { + if (this.options.onBlankClick(data)) data.e.stopPropagation() + }) + this.graph.on('blank:dblclick', data => { + if (this.options.onBlankDoubleClick(data)) data.e.stopPropagation() + }) + this.graph.on('blank:contextmenu', data => { + if (this.options.onBlankContextmenu(data)) data.e.stopPropagation() + }) + } + + panning () { + this.graph.enablePanning() + this.graph.disableRubberband() + } + + selecting () { + this.graph.disablePanning() + this.graph.enableRubberband() + } + + fitting () { + this.graph.centerContent() + } + + showPorts (ports, show) { + for (let i = 0, len = ports.length; i < len; i = i + 1) { + ports[i].style.visibility = show ? 'visible' : 'hidden' + } + } + + reset (cells = []) { + this.graph.freeze() + this.graph.fromJSON(cells) + cells.forEach(cell => { + if (cell.shape === 'flow-edge') { + this.graph.getCellById(cell.id).setLabels(cell.data.title) + } else { + if (cell.data.index > this.counter) this.counter = cell.data.index + } + }) + this.graph.unfreeze() + } + + collect () { + const result = this.graph.toJSON() + result.cells = result.cells.map(cell => { + return this.briefJSON(cell) + }) + return result + } + + toggleGrid (visible) { + visible ? this.graph.showGrid() : this.graph.hideGrid() + } + + generateNode (widget, ev, callback) { + const counter = ++this.counter + const point = this.graph.pageToLocal(ev.pageX, ev.pageY) + const shapeSize = this.shapeSizes[widget.shape] + const item = { + id: `node_${counter}`, + x: point.x - shapeSize.offsetX, + y: point.y - shapeSize.offsetY, + width: shapeSize.width, + height: shapeSize.height, + shape: widget.shape, + data: Object.assign({ + index: counter, + title: `${widget.label}_${counter}`, + icon: widget.icon, + type: widget.type, + description: widget.title, + options: widget.options() + }, callback ? callback() : {}) + } + return item + } + + addNode (item) { + return this.graph.addNode(item) + } + + widgetDragStart (widget, ev, callback) { + const item = this.generateNode(widget, ev, callback) + const node = this.graph.createNode(item) + this.dnd.start(node, ev) + return item + } + + updateCell (item) { + if (!item) return false + const cell = this.graph.getCellById(item.id) + if (!cell) return false + cell.setData(item.data) + return true + } + + cell2meta (cell) { + const json = cell.toJSON() + return this.briefJSON(json) + } + + briefJSON (json) { + const result = { + id: json.id, + shape: json.shape, + zIndex: json.zIndex + } + if (json.data) result.data = json.data + if (['flow-edge', 'edge'].indexOf(result.shape) !== -1) { + Object.assign(result, { + source: json.source, + target: json.target + }) + } else { + Object.assign(result, { + x: json.position.x, + y: json.position.y, + width: json.size.width, + height: json.size.height + }) + if (json.angle) result.angle = json.angle + if (json.parent) result.parent = json.parent + if (json.children) result.children = json.children + } + return result + } + + subprocess (node) { + if (!node) return null + if (node.parent instanceof X6.FlowSubprocess) return node.parent + return this.subprocess(node.parent) + } + + highlight (cells) { + this.graph.getCells().forEach(cell => { + switch (cell.shape) { + case 'flow-edge': + cell.attr({ line: { stroke: '#A2B1C3' } }) + break + case 'flow-group': + case 'flow-subprocess': + cell.attr({ text: { fill: '' } }) + break + case 'flow-node': + cell.attr({ foreignObject: { class: '' } }) + break + } + }) + cells.forEach(cell => { + cell = this.graph.getCellById(cell.id) + switch (cell.shape) { + case 'flow-edge': + cell.attr({ line: { stroke: '#409eff' } }) + break + case 'flow-group': + case 'flow-subprocess': + cell.attr({ text: { fill: '#409eff' } }) + break + case 'flow-node': + cell.attr({ foreignObject: { class: 'fs-flow-highlight' } }) + break + } + }) + } +} + +export default Flow diff --git a/static/admin/src/components/X6/flow.less b/static/admin/src/components/X6/flow.less new file mode 100644 index 00000000..56b395cf --- /dev/null +++ b/static/admin/src/components/X6/flow.less @@ -0,0 +1,77 @@ +.widget () { + .x6-widget-stencil { + background-color: #fff; + } + .x6-widget-stencil-title { + background-color: #fff; + } + .x6-widget-stencil-group-title { + background-color: #fff !important; + } + .x6-widget-transform { + margin: -1px 0 0 -1px; + padding: 0px; + border: 1px solid #239edd; + } + .x6-widget-transform > div { + border: 1px solid #239edd; + } + .x6-widget-transform > div:hover { + background-color: #3dafe4; + } + .x6-widget-transform-active-handle { + background-color: #3dafe4; + } + .x6-widget-transform-resize { + border-radius: 0; + } + .x6-widget-selection-inner { + border: 1px solid #239edd; + } + .x6-widget-selection-box { + opacity: 0; + } + .fs-flow-highlight { + .fs-flow-node { + color: #409eff; + } + } +} + +.property (@width) { + height: 100%; + width: @width; + display: inline-block; + border-left: solid 1px #cbcccc; + & /deep/ .ant-tabs { + height: 100%; + } + & /deep/ .ant-tabs-bar { + margin: 0px; + } + & /deep/ .ant-tabs-content { + height: calc(100% - 44px); + overflow-y: scroll; + padding: 10px 15px 10px 15px; + } + & /deep/ .ant-form-item { + margin-bottom: 0px; + padding: 5px 0px; + } + & /deep/ .fs-property-title { + color: rgba(0,0,0,.85); + font-weight: 600; + font-size: 14px; + line-height: 1.5; + padding: 5px 0px; + .fs-property-action { + float: right; + .ant-space-item { + cursor: pointer; + } + .fs-action-delete:hover { + color: #ff4d4f; + } + } + } +} diff --git a/static/admin/src/components/X6/index.js b/static/admin/src/components/X6/index.js new file mode 100644 index 00000000..64717ea4 --- /dev/null +++ b/static/admin/src/components/X6/index.js @@ -0,0 +1,73 @@ +import { Graph, Shape, Addon } from '@antv/x6' +import '@antv/x6-vue-shape' +import FlowEdge from './FlowEdge' +import FlowGroup from './FlowGroup' +import FlowSubprocess from './FlowSubprocess' + +const CircleAttr = { + circle: { + r: 4, + magnet: true, + stroke: '#5F95FF', + strokeWidth: 1, + fill: '#fff', + style: { visibility: 'hidden' } + } +} +const D4Port = { + groups: { + top: { position: 'top', attrs: CircleAttr }, + right: { position: 'right', attrs: CircleAttr }, + bottom: { position: 'bottom', attrs: CircleAttr }, + left: { position: 'left', attrs: CircleAttr } + }, + items: [ + { id: 'top', group: 'top' }, + { id: 'right', group: 'right' }, + { id: 'bottom', group: 'bottom' }, + { id: 'left', group: 'left' } + ] +} + +Graph.registerNode('flow-node', { + inherit: 'vue-shape', + component: { template: ``, components: { FlowNode: () => import('./FlowNode') } }, + ports: D4Port +}) + +Graph.registerEdge('flow-edge', FlowEdge) + +Graph.registerNode('flow-group', FlowGroup) + +Graph.registerNode('flow-subprocess', { + inherit: FlowSubprocess, + ports: D4Port +}) + +Graph.registerEdgeTool('circle-source-arrowhead', { + inherit: 'source-arrowhead', + tagName: 'circle', + attrs: { + r: 3, + fill: '#31d0c6', + 'fill-opacity': 0.3, + stroke: '#fe854f', + 'stroke-width': 2, + cursor: 'move' + } +}) + +Graph.registerEdgeTool('circle-target-arrowhead', { + inherit: 'target-arrowhead', + tagName: 'circle', + attrs: { + r: 3, + fill: '#31d0c6', + 'fill-opacity': 0.3, + stroke: '#fe854f', + 'stroke-width': 2, + cursor: 'move' + } +}) + +export { Graph, Shape, Addon, FlowEdge, FlowGroup, FlowSubprocess } diff --git a/static/admin/src/views/bi/diagram/design/APIConfigProperty.vue b/static/admin/src/views/bi/diagram/design/APIConfigProperty.vue index c5d3e1e6..e1cb9ef2 100644 --- a/static/admin/src/views/bi/diagram/design/APIConfigProperty.vue +++ b/static/admin/src/views/bi/diagram/design/APIConfigProperty.vue @@ -2,17 +2,26 @@ - +
接口配置
- + - + {{ item.label }} - - - + + +
@@ -26,8 +35,11 @@ export default { }, props: { value: { type: Object, required: true }, + flow: { type: Object, required: true }, config: { type: Object, required: true }, - activeItem: { type: Object, required: true } + diagram: { type: Object, required: true }, + activeItem: { type: Object, default: null }, + tips: { type: String, default: '' } }, data () { return { @@ -36,7 +48,7 @@ export default { }, computed: { defaults () { - return this.config.widgetDefaults(this.value.type) + return this.config.widgetDefaults(this.value.data.type) } }, watch: { @@ -50,14 +62,13 @@ export default { methods: { formatted (obj) { const options = { - url: obj.options.url || this.defaults.url, - method: obj.options.method || this.defaults.method, - checkField: obj.options.checkField || this.defaults.checkField, - checkValue: obj.options.checkValue || this.defaults.checkValue, - dataField: obj.options.dataField || this.defaults.dataField + url: obj.data.options.url || this.defaults.url, + method: obj.data.options.method || this.defaults.method, + checkField: obj.data.options.checkField || this.defaults.checkField, + checkValue: obj.data.options.checkValue || this.defaults.checkValue, + dataField: obj.data.options.dataField || this.defaults.dataField } - const result = Object.assign({}, obj, { options: Object.assign({}, obj.options, options) }) - return result + return this.config.mergeOptions(obj, options) } } } diff --git a/static/admin/src/views/bi/diagram/design/AnchorTransformProperty.vue b/static/admin/src/views/bi/diagram/design/AnchorTransformProperty.vue index f65e3761..120f0bd9 100644 --- a/static/admin/src/views/bi/diagram/design/AnchorTransformProperty.vue +++ b/static/admin/src/views/bi/diagram/design/AnchorTransformProperty.vue @@ -2,9 +2,18 @@ - +
参数配置
- 仅保留和转换数据字段的配置项 + 仅保留和转换数据字段的配置项 数据字段 import('./SliceBasic') }, props: { value: { type: Object, required: true }, + flow: { type: Object, required: true }, config: { type: Object, required: true }, - activeItem: { type: Object, required: true } + diagram: { type: Object, required: true }, + activeItem: { type: Object, default: null }, + tips: { type: String, default: '' } }, data () { return { @@ -66,13 +78,13 @@ export default { }, computed: { defaults () { - return this.config.widgetDefaults(this.value.type) + return this.config.widgetDefaults(this.value.data.type) } }, watch: { 'activeItem.id': { handler () { - this.sortTable.reset(this.value.options.items) + this.sortTable.reset(this.value.data.options.items) this.$emit('input', this.formatted(this.value)) }, immediate: true @@ -81,11 +93,10 @@ export default { methods: { formatted (obj) { const options = { - mode: !!obj.options.convertible, - items: Array.isArray(obj.options.items) ? obj.options.items : obj.defaults.items + mode: !!obj.data.options.convertible, + items: Array.isArray(obj.data.options.items) ? obj.data.options.items : this.defaults.items } - const result = Object.assign({}, obj, { options: Object.assign({}, obj.options, options) }) - return result + return this.config.mergeOptions(obj, options) }, rowItem () { return { field: '', clsType: '' } @@ -110,7 +121,7 @@ export default { }, mounted () { this.loadDAGConfig() - this.sortTable.reset(this.value.options.items) + this.sortTable.reset(this.value.data.options.items) } } diff --git a/static/admin/src/views/bi/diagram/design/CalendarOffsetConfigProperty.vue b/static/admin/src/views/bi/diagram/design/CalendarOffsetConfigProperty.vue index 893d6631..60f6dfca 100644 --- a/static/admin/src/views/bi/diagram/design/CalendarOffsetConfigProperty.vue +++ b/static/admin/src/views/bi/diagram/design/CalendarOffsetConfigProperty.vue @@ -2,31 +2,40 @@ - +
参数配置
- - + + - ms + ms - + - + {{ item.label }} - + {{ item.label }} - + {{ item.label }} - + {{ item.label }} @@ -43,8 +52,11 @@ export default { }, props: { value: { type: Object, required: true }, + flow: { type: Object, required: true }, config: { type: Object, required: true }, - activeItem: { type: Object, required: true } + diagram: { type: Object, required: true }, + activeItem: { type: Object, default: null }, + tips: { type: String, default: '' } }, data () { return { @@ -88,7 +100,7 @@ export default { }, computed: { defaults () { - return this.config.widgetDefaults(this.value.type) + return this.config.widgetDefaults(this.value.data.type) } }, watch: { @@ -102,14 +114,13 @@ export default { methods: { formatted (obj) { const options = { - arg: obj.options.arg || this.defaults.arg, - reference: obj.options.reference || this.defaults.reference, - pattern: obj.options.pattern || this.defaults.pattern, - timezone: obj.options.timezone || this.defaults.timezone, - locale: obj.options.locale || this.defaults.locale + arg: obj.data.options.arg || this.defaults.arg, + reference: obj.data.options.reference || this.defaults.reference, + pattern: obj.data.options.pattern || this.defaults.pattern, + timezone: obj.data.options.timezone || this.defaults.timezone, + locale: obj.data.options.locale || this.defaults.locale } - const result = Object.assign({}, obj, { options: Object.assign({}, obj.options, options) }) - return result + return this.config.mergeOptions(obj, options) } } } diff --git a/static/admin/src/views/bi/diagram/design/CanvasProperty.vue b/static/admin/src/views/bi/diagram/design/CanvasProperty.vue index 6f7359d3..c06380b1 100644 --- a/static/admin/src/views/bi/diagram/design/CanvasProperty.vue +++ b/static/admin/src/views/bi/diagram/design/CanvasProperty.vue @@ -1,13 +1,12 @@