From e1db7aa90097a45ad08d6214eb4f64cf5a8fef5c Mon Sep 17 00:00:00 2001 From: hawk9821 Date: Tue, 16 Jul 2024 10:08:06 +0800 Subject: [PATCH 1/9] [Feature][Zeta] Added the metrics information of table statistics in multi-table mode --- .../sink/multitablesink/MultiTableSink.java | 5 + .../paimon/catalog/PaimonCatalog.java | 3 +- .../engine/e2e/MultiTableMetricsIT.java | 117 ++++++++++++++++++ .../batch_fake_multi_table_to_console.conf | 62 ++++++++++ .../engine/client/SeaTunnelClientTest.java | 113 +++++++++++++++++ .../batch_fake_multi_table_to_console.conf | 64 ++++++++++ .../seatunnel-engine-server/pom.xml | 5 + .../rest/RestHttpGetCommandProcessor.java | 74 ++++++++++- .../server/task/SeaTunnelSourceCollector.java | 53 ++++++-- .../server/task/SourceSeaTunnelTask.java | 17 ++- .../server/task/flow/SinkFlowLifeCycle.java | 38 ++++++ 11 files changed, 534 insertions(+), 17 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf create mode 100644 seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java index bb04283ca68..923ecff8b88 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSink.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.factory.MultiTableFactoryContext; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -149,6 +150,10 @@ public Optional> getCommitInfoSerializer() { return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters)); } + public List getSinkTables() { + return sinks.keySet().stream().map(TablePath::of).collect(Collectors.toList()); + } + @Override public Optional> getAggregatedCommitInfoSerializer() { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index d896e015398..2c9fcd6f828 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -222,8 +222,7 @@ private CatalogTable toCatalogTable( BasicTypeDefine.builder() .name(dataField.name()) .comment(dataField.description()) - .nativeType(dataField.type()) - .nullable(dataField.type().isNullable()); + .nativeType(dataField.type()); Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build()); builder.column(column); }); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java new file mode 100644 index 00000000000..1cab231187f --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.engine.server.rest.RestConstant; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.equalTo; + +public class MultiTableMetricsIT { + + private static final String HOST = "http://localhost:"; + + private static ClientJobProxy batchJobProxy; + + private static HazelcastInstanceImpl node1; + + private static SeaTunnelClient engineClient; + + @BeforeEach + void beforeClass() throws Exception { + String testClusterName = TestUtils.getClusterName("RestApiIT"); + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(testClusterName); + engineClient = new SeaTunnelClient(clientConfig); + + String batchFilePath = TestUtils.getResource("batch_fake_multi_table_to_console.conf"); + JobConfig batchConf = new JobConfig(); + batchConf.setName("batch_fake_multi_table_to_console"); + ClientJobExecutionEnvironment batchJobExecutionEnv = + engineClient.createExecutionContext(batchFilePath, batchConf, seaTunnelConfig); + batchJobProxy = batchJobExecutionEnv.execute(); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, batchJobProxy.getJobStatus())); + } + + @Test + public void multiTableMetrics() { + Collections.singletonList(node1) + .forEach( + instance -> { + given().get( + HOST + + instance.getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.JOB_INFO_URL + + "/" + + batchJobProxy.getJobId()) + .then() + .statusCode(200) + .body("jobName", equalTo("batch_fake_multi_table_to_console")) + .body("jobStatus", equalTo("FINISHED")) + .body("metrics.SourceReceivedCount", equalTo("50")) + .body("metrics.SinkWriteCount", equalTo("50")) + .body("metrics.TableSourceReceivedCount.fake1", equalTo("20")) + .body("metrics.TableSourceReceivedCount.fake2", equalTo("30")) + .body("metrics.TableSinkWriteCount.fake1", equalTo("20")) + .body("metrics.TableSinkWriteCount.fake2", equalTo("30")); + }); + } + + @AfterEach + void afterClass() { + if (engineClient != null) { + engineClient.close(); + } + + if (node1 != null) { + node1.shutdown(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf new file mode 100644 index 00000000000..f38da63ea3f --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake1" + row.num = 20 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + FakeSource { + result_table_name = "fake2" + row.num = 30 + schema = { + fields { + name = "string" + age = "int" + sex = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name = "fake1" + } + console { + source_table_name = "fake2" + } +} diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index d7e55db4ec2..fecff30e7af 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.client; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.common.config.Common; @@ -51,10 +53,14 @@ import java.net.URL; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Spliterators; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS; @@ -548,6 +554,113 @@ public void testSavePointAndRestoreWithSavePoint() throws Exception { } } + @Test + public void testGetMultiTableJobMetrics() { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/batch_fake_multi_table_to_console.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("testGetMultiTableJobMetrics"); + + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + + try { + ClientJobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync( + () -> { + return clientJobProxy.waitForJobComplete(); + }); + long jobId = clientJobProxy.getJobId(); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + jobClient.getJobDetailStatus(jobId).contains("FINISHED") + && jobClient + .listJobStatus(true) + .contains("FINISHED"))); + + String jobMetrics = jobClient.getJobMetrics(jobId); + + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake1")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake2")); + + log.info("jobMetrics : {}", jobMetrics); + JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); + List metricNameList = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + jobMetricsStr.fieldNames(), 0), + false) + .filter( + metricName -> + metricName.startsWith(SOURCE_RECEIVED_COUNT) + || metricName.startsWith(SINK_WRITE_COUNT)) + .collect(Collectors.toList()); + + Map totalCount = + metricNameList.stream() + .filter(metrics -> !metrics.contains("#")) + .collect( + Collectors.toMap( + metrics -> metrics, + metrics -> + StreamSupport.stream( + jobMetricsStr + .get(metrics) + .spliterator(), + false) + .mapToLong( + value -> + value.get("value") + .asLong()) + .sum())); + + Map tableCount = + metricNameList.stream() + .filter(metrics -> metrics.contains("#")) + .collect( + Collectors.toMap( + metrics -> metrics, + metrics -> + StreamSupport.stream( + jobMetricsStr + .get(metrics) + .spliterator(), + false) + .mapToLong( + value -> + value.get("value") + .asLong()) + .sum())); + + Assertions.assertEquals( + totalCount.get(SOURCE_RECEIVED_COUNT), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_COUNT)) + .mapToLong(Map.Entry::getValue) + .sum()); + Assertions.assertEquals( + totalCount.get(SINK_WRITE_COUNT), + tableCount.entrySet().stream() + .filter(e -> e.getKey().startsWith(SINK_WRITE_COUNT)) + .mapToLong(Map.Entry::getValue) + .sum()); + + } catch (ExecutionException | InterruptedException | JsonProcessingException e) { + throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); + } + } + @AfterAll public static void after() { INSTANCE.shutdown(); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf new file mode 100644 index 00000000000..51fc81dae2c --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake1" + row.num = 20 + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } + + FakeSource { + result_table_name = "fake2" + row.num = 30 + schema = { + fields { + name = "string" + age = "int" + sex = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + console { + source_table_name = "fake1" + } + console { + source_table_name = "fake2" + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml index abd296cb1e3..c38c398577d 100644 --- a/seatunnel-engine/seatunnel-engine-server/pom.xml +++ b/seatunnel-engine/seatunnel-engine-server/pom.xml @@ -33,6 +33,11 @@ seatunnel-engine-core ${project.version} + + org.apache.seatunnel + seatunnel-api + ${project.version} + org.apache.seatunnel checkpoint-storage-hdfs diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 6081b0f2eaf..96fd25eca27 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.seatunnel.api.common.metrics.JobMetrics; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.common.Constant; @@ -64,8 +65,10 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.Spliterators; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO; @@ -79,7 +82,9 @@ public class RestHttpGetCommandProcessor extends HttpCommandProcessor { private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount"; + private static final String TABLE_SOURCE_RECEIVED_COUNT = "TableSourceReceivedCount"; private static final String SINK_WRITE_COUNT = "SinkWriteCount"; + private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount"; private final Log4j2HttpGetCommandProcessor original; private NodeEngine nodeEngine; @@ -362,12 +367,31 @@ private void getRunningThread(HttpGetCommand command) { .collect(JsonArray::new, JsonArray::add, JsonArray::add)); } - private Map getJobMetrics(String jobMetrics) { - Map metricsMap = new HashMap<>(); + private Map getJobMetrics(String jobMetrics) { + Map metricsMap = new HashMap<>(); long sourceReadCount = 0L; long sinkWriteCount = 0L; + Map tableSourceReceivedCountMap = new HashMap<>(); + Map tableSinkWriteCountMap = new HashMap<>(); try { JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0), + false) + .filter(metricName -> metricName.contains("#")) + .forEach( + metricName -> { + String tableName = + TablePath.of(metricName.split("#")[1]).getTableName(); + if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { + tableSourceReceivedCountMap.put( + tableName, jobMetricsStr.get(metricName)); + } + if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { + tableSinkWriteCountMap.put( + tableName, jobMetricsStr.get(metricName)); + } + }); JsonNode sourceReceivedCountJson = jobMetricsStr.get(SOURCE_RECEIVED_COUNT); JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT); for (int i = 0; i < jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) { @@ -379,9 +403,36 @@ private Map getJobMetrics(String jobMetrics) { } catch (JsonProcessingException | NullPointerException e) { return metricsMap; } + + Map tableSourceReceivedCount = + tableSourceReceivedCountMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.get("value").asLong()) + .sum())); + Map tableSinkWriteCount = + tableSinkWriteCountMap.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> + StreamSupport.stream( + entry.getValue().spliterator(), + false) + .mapToLong( + node -> node.get("value").asLong()) + .sum())); + metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount); metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount); - + metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount); + metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount); return metricsMap; } @@ -475,11 +526,24 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { .add( RestConstant.IS_START_WITH_SAVE_POINT, jobImmutableInformation.isStartWithSavePoint()) - .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); return jobInfoJson; } + private JsonObject toJsonObject(Map jobMetrics) { + JsonObject members = new JsonObject(); + jobMetrics.forEach( + (key, value) -> { + if (value instanceof Map) { + members.add(key, toJsonObject((Map) value)); + } else { + members.add(key, value.toString()); + } + }); + return members; + } + private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) { return new JsonObject() .add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId())) @@ -498,6 +562,6 @@ private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGIn DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)) .add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo)) .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray()) - .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + .add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics))); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index f5d4aed1ab4..160324da700 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; @@ -34,12 +35,17 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; @@ -54,12 +60,16 @@ public class SeaTunnelSourceCollector implements Collector { private final List>> outputs; + private final MetricsContext metricsContext; + private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new AtomicBoolean(false); private final AtomicBoolean schemaChangeAfterCheckpointSignal = new AtomicBoolean(false); private final Counter sourceReceivedCount; + private final Map sourceReceivedCountPerTable = new ConcurrentHashMap<>(); + private final Meter sourceReceivedQPS; private final Counter sourceReceivedBytes; @@ -77,17 +87,24 @@ public SeaTunnelSourceCollector( List>> outputs, MetricsContext metricsContext, FlowControlStrategy flowControlStrategy, - SeaTunnelDataType rowType) { + SeaTunnelDataType rowType, + List tablePaths) { this.checkpointLock = checkpointLock; this.outputs = outputs; this.rowType = rowType; + this.metricsContext = metricsContext; if (rowType instanceof MultipleRowType) { ((MultipleRowType) rowType) .iterator() - .forEachRemaining( - type -> { - this.rowTypeMap.put(type.getKey(), type.getValue()); - }); + .forEachRemaining(type -> this.rowTypeMap.put(type.getKey(), type.getValue())); + } + if (CollectionUtils.isNotEmpty(tablePaths)) { + tablePaths.forEach( + tablePath -> + sourceReceivedCountPerTable.put( + getFullName(tablePath), + metricsContext.counter( + SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath)))); } sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); @@ -96,18 +113,24 @@ public SeaTunnelSourceCollector( flowControlGate = FlowControlGate.create(flowControlStrategy); } + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } + @Override public void collect(T row) { try { if (row instanceof SeaTunnelRow) { + String tableId = ((SeaTunnelRow) row).getTableId(); int size; if (rowType instanceof SeaTunnelRowType) { size = ((SeaTunnelRow) row).getBytesSize((SeaTunnelRowType) rowType); } else if (rowType instanceof MultipleRowType) { - size = - ((SeaTunnelRow) row) - .getBytesSize( - rowTypeMap.get(((SeaTunnelRow) row).getTableId())); + size = ((SeaTunnelRow) row).getBytesSize(rowTypeMap.get(tableId)); } else { throw new SeaTunnelEngineException( "Unsupported row type: " + rowType.getClass().getName()); @@ -115,6 +138,18 @@ public void collect(T row) { sourceReceivedBytes.inc(size); sourceReceivedBytesPerSeconds.markEvent(size); flowControlGate.audit((SeaTunnelRow) row); + if (StringUtils.isNotEmpty(tableId)) { + String tableName = getFullName(TablePath.of(tableId)); + Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName); + if (Objects.nonNull(sourceTableCounter)) { + sourceTableCounter.inc(); + } else { + Counter counter = + metricsContext.counter(SOURCE_RECEIVED_COUNT + "#" + tableName); + counter.inc(); + sourceReceivedCountPerTable.put(tableName, counter); + } + } } sendRecordToNext(new Record<>(row)); emptyThisPollNext = false; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 53171d40315..dfbe25beea8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; @@ -37,9 +38,11 @@ import lombok.Getter; import lombok.NonNull; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class SourceSeaTunnelTask extends SeaTunnelTask { @@ -76,10 +79,21 @@ public void init() throws Exception { + startFlowLifeCycle.getClass().getName()); } else { SeaTunnelDataType sourceProducedType; + List tablePaths = new ArrayList<>(); try { List producedCatalogTables = sourceFlow.getAction().getSource().getProducedCatalogTables(); sourceProducedType = CatalogTableUtil.convertToDataType(producedCatalogTables); + tablePaths = + producedCatalogTables.stream() + .map(CatalogTable::getTableId) + .map( + tableIdentifier -> + TablePath.of( + tableIdentifier.getDatabaseName(), + tableIdentifier.getSchemaName(), + tableIdentifier.getTableName())) + .collect(Collectors.toList()); } catch (UnsupportedOperationException e) { // TODO remove it when all connector use `getProducedCatalogTables` sourceProducedType = sourceFlow.getAction().getSource().getProducedType(); @@ -90,7 +104,8 @@ public void init() throws Exception { outputs, this.getMetricsContext(), FlowControlStrategy.fromMap(envOption), - sourceProducedType); + sourceProducedType, + tablePaths); ((SourceFlowLifeCycle) startFlowLifeCycle).setCollector(collector); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 48c530a0c36..516e1c97c41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -26,6 +26,8 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -43,6 +45,8 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; +import org.apache.commons.lang3.StringUtils; + import com.hazelcast.cluster.Address; import lombok.extern.slf4j.Slf4j; @@ -52,9 +56,11 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -92,6 +98,8 @@ public class SinkFlowLifeCycle sinkWriteCountPerTable = new ConcurrentHashMap<>(); + private Meter sinkWriteQPS; private Counter sinkWriteBytes; @@ -125,6 +133,15 @@ public SinkFlowLifeCycle( sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS); sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES); sinkWriteBytesPerSeconds = metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS); + if (sinkAction.getSink() instanceof MultiTableSink) { + List sinkTables = ((MultiTableSink) sinkAction.getSink()).getSinkTables(); + sinkTables.forEach( + tablePath -> + sinkWriteCountPerTable.put( + getFullName(tablePath), + metricsContext.counter( + SINK_WRITE_COUNT + "#" + getFullName(tablePath)))); + } } @Override @@ -256,6 +273,19 @@ public void received(Record record) { long size = ((SeaTunnelRow) record.getData()).getBytesSize(); sinkWriteBytes.inc(size); sinkWriteBytesPerSeconds.markEvent(size); + String tableId = ((SeaTunnelRow) record.getData()).getTableId(); + if (StringUtils.isNotBlank(tableId)) { + String tableName = getFullName(TablePath.of(tableId)); + Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); + if (Objects.nonNull(sinkTableCounter)) { + sinkTableCounter.inc(); + } else { + Counter counter = + metricsContext.counter(SINK_WRITE_COUNT + "#" + tableName); + counter.inc(); + sinkWriteCountPerTable.put(tableName, counter); + } + } } } } catch (Exception e) { @@ -315,4 +345,12 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } } From d0227061680de8e22ee966a1631d86adf3f6b895 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 24 Jul 2024 12:00:22 +0800 Subject: [PATCH 2/9] update --- .../connectors/seatunnel/paimon/catalog/PaimonCatalog.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index 2c9fcd6f828..d896e015398 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -222,7 +222,8 @@ private CatalogTable toCatalogTable( BasicTypeDefine.builder() .name(dataField.name()) .comment(dataField.description()) - .nativeType(dataField.type()); + .nativeType(dataField.type()) + .nullable(dataField.type().isNullable()); Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build()); builder.column(column); }); From 5017b9f6ad0ea47a41a4599a43019dcf8f73bbf0 Mon Sep 17 00:00:00 2001 From: zhangdonghao <39961809+hawk9821@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:30:38 +0800 Subject: [PATCH 3/9] Update seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf Co-authored-by: Jia Fan --- .../src/test/resources/batch_fake_multi_table_to_console.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf index f38da63ea3f..75b7a8826f9 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -40,6 +40,7 @@ source { result_table_name = "fake2" row.num = 30 schema = { + table = "fake.public.table2" fields { name = "string" age = "int" From f5b8cf6612ca301e3a25caa723545ac7d8946f03 Mon Sep 17 00:00:00 2001 From: zhangdonghao <39961809+hawk9821@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:36:22 +0800 Subject: [PATCH 4/9] Update seatunnel-engine/seatunnel-engine-server/pom.xml Co-authored-by: Jia Fan --- seatunnel-engine/seatunnel-engine-server/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml b/seatunnel-engine/seatunnel-engine-server/pom.xml index c38c398577d..abd296cb1e3 100644 --- a/seatunnel-engine/seatunnel-engine-server/pom.xml +++ b/seatunnel-engine/seatunnel-engine-server/pom.xml @@ -33,11 +33,6 @@ seatunnel-engine-core ${project.version} - - org.apache.seatunnel - seatunnel-api - ${project.version} - org.apache.seatunnel checkpoint-storage-hdfs From be9289bd154f649fcbdf2eb98ffe4344fca3ae2e Mon Sep 17 00:00:00 2001 From: zhangdonghao <39961809+hawk9821@users.noreply.github.com> Date: Wed, 24 Jul 2024 12:36:37 +0800 Subject: [PATCH 5/9] Update seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf Co-authored-by: Jia Fan --- .../src/test/resources/batch_fake_multi_table_to_console.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf index 75b7a8826f9..c51929a0edb 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -29,6 +29,7 @@ source { result_table_name = "fake1" row.num = 20 schema = { + table = "fake.table1" fields { name = "string" age = "int" From e265654c640413efc2108966150143e29148ea01 Mon Sep 17 00:00:00 2001 From: hawk9821 Date: Tue, 16 Jul 2024 10:08:06 +0800 Subject: [PATCH 6/9] [Feature][Zeta engine] Added the metrics information of table statistics in multi-table mode #6959 --- .github/workflows/backend.yml | 78 +++++++++++++++++-- .../server/task/SeaTunnelSourceCollector.java | 17 +--- .../server/task/flow/SinkFlowLifeCycle.java | 19 +---- 3 files changed, 73 insertions(+), 41 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 88a2d59e3f1..1e9c02f45a6 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -313,7 +313,7 @@ jobs: - name: run updated modules integration test (part-1) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -344,7 +344,7 @@ jobs: - name: run updated modules integration test (part-2) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 1` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -375,7 +375,7 @@ jobs: - name: run updated modules integration test (part-3) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 2` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -406,7 +406,7 @@ jobs: - name: run updated modules integration test (part-4) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 3` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -436,7 +436,7 @@ jobs: - name: run updated modules integration test (part-5) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 4` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -466,7 +466,7 @@ jobs: - name: run updated modules integration test (part-6) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 5` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -496,7 +496,7 @@ jobs: - name: run updated modules integration test (part-7) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 6` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -527,7 +527,69 @@ jobs: - name: run updated modules integration test (part-8) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 7` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m + + updated-modules-integration-test-part-9: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh + - name: run updated modules integration test (part-8) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 8` + if [ ! -z $sub_modules ]; then + ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci + else + echo "sub modules is empty, skipping" + fi + env: + MAVEN_OPTS: -Xmx2048m + + updated-modules-integration-test-part-10: + needs: [ changes, sanity-check ] + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + runs-on: ${{ matrix.os }} + strategy: + matrix: + java: [ '8', '11' ] + os: [ 'ubuntu-latest' ] + timeout-minutes: 90 + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.java }} + uses: actions/setup-java@v3 + with: + java-version: ${{ matrix.java }} + distribution: 'temurin' + cache: 'maven' + - name: free disk space + run: tools/github/free_disk_space.sh + - name: run updated modules integration test (part-8) + if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' + run: | + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 9` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 160324da700..2b9abed83a2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.engine.server.task; +import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.common.metrics.Counter; import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; @@ -35,17 +35,10 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; - -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; @@ -113,14 +106,6 @@ public SeaTunnelSourceCollector( flowControlGate = FlowControlGate.create(flowControlStrategy); } - private String getFullName(TablePath tablePath) { - if (StringUtils.isBlank(tablePath.getTableName())) { - tablePath = - TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); - } - return tablePath.getFullName(); - } - @Override public void collect(T row) { try { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 516e1c97c41..b17393d3510 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.engine.server.task.flow; +import com.hazelcast.cluster.Address; +import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.common.metrics.Counter; import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; @@ -26,8 +28,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; -import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; -import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -45,22 +45,15 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; -import org.apache.commons.lang3.StringUtils; - -import com.hazelcast.cluster.Address; -import lombok.extern.slf4j.Slf4j; - import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -345,12 +338,4 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } - - private String getFullName(TablePath tablePath) { - if (StringUtils.isBlank(tablePath.getTableName())) { - tablePath = - TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); - } - return tablePath.getFullName(); - } } From f4482348b39a155609992fe0f45e25ab2111b40e Mon Sep 17 00:00:00 2001 From: hawk9821 Date: Thu, 18 Jul 2024 18:08:57 +0800 Subject: [PATCH 7/9] Revert "The isNullable attribute is true when the primary key field in the Paimon table converts the Column object. #7231" This reverts commit 83b3b4fad486936df7b51f352ac5463040d35b53. --- .../connectors/seatunnel/paimon/catalog/PaimonCatalog.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index d896e015398..2c9fcd6f828 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -222,8 +222,7 @@ private CatalogTable toCatalogTable( BasicTypeDefine.builder() .name(dataField.name()) .comment(dataField.description()) - .nativeType(dataField.type()) - .nullable(dataField.type().isNullable()); + .nativeType(dataField.type()); Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build()); builder.column(column); }); From 87767e511c551f5c3aafab71af4470f82d7c975e Mon Sep 17 00:00:00 2001 From: hawk9821 Date: Wed, 24 Jul 2024 15:18:08 +0800 Subject: [PATCH 8/9] [Feature][Zeta engine] Added the metrics information of table statistics in multi-table mode #6959 --- .../paimon/catalog/PaimonCatalog.java | 3 ++- .../engine/e2e/MultiTableMetricsIT.java | 16 ++++++++++++---- .../engine/client/SeaTunnelClientTest.java | 9 +++++---- .../batch_fake_multi_table_to_console.conf | 2 ++ .../rest/RestHttpGetCommandProcessor.java | 2 +- .../server/task/SeaTunnelSourceCollector.java | 17 ++++++++++++++++- .../server/task/SourceSeaTunnelTask.java | 8 ++------ .../server/task/flow/SinkFlowLifeCycle.java | 19 +++++++++++++++++-- 8 files changed, 57 insertions(+), 19 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index 2c9fcd6f828..d896e015398 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -222,7 +222,8 @@ private CatalogTable toCatalogTable( BasicTypeDefine.builder() .name(dataField.name()) .comment(dataField.description()) - .nativeType(dataField.type()); + .nativeType(dataField.type()) + .nullable(dataField.type().isNullable()); Column column = SchemaUtil.toSeaTunnelType(typeDefineBuilder.build()); builder.column(column); }); diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java index 1cab231187f..59942eb4cc8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -97,10 +97,18 @@ public void multiTableMetrics() { .body("jobStatus", equalTo("FINISHED")) .body("metrics.SourceReceivedCount", equalTo("50")) .body("metrics.SinkWriteCount", equalTo("50")) - .body("metrics.TableSourceReceivedCount.fake1", equalTo("20")) - .body("metrics.TableSourceReceivedCount.fake2", equalTo("30")) - .body("metrics.TableSinkWriteCount.fake1", equalTo("20")) - .body("metrics.TableSinkWriteCount.fake2", equalTo("30")); + .body( + "metrics.TableSourceReceivedCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSourceReceivedCount.'fake.public.table2'", + equalTo("30")) + .body( + "metrics.TableSinkWriteCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSinkWriteCount.'fake.public.table2'", + equalTo("30")); }); } diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index fecff30e7af..100aa0b3203 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -587,10 +587,11 @@ public void testGetMultiTableJobMetrics() { String jobMetrics = jobClient.getJobMetrics(jobId); - Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake1")); - Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake2")); - Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake1")); - Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake2")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2")); log.info("jobMetrics : {}", jobMetrics); JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf index 51fc81dae2c..df7ae51fe6e 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf @@ -29,6 +29,7 @@ source { result_table_name = "fake1" row.num = 20 schema = { + table = "fake.table1" fields { name = "string" age = "int" @@ -41,6 +42,7 @@ source { result_table_name = "fake2" row.num = 30 schema = { + table = "fake.public.table2" fields { name = "string" age = "int" diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 96fd25eca27..d5d60b7cbb4 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -382,7 +382,7 @@ private Map getJobMetrics(String jobMetrics) { .forEach( metricName -> { String tableName = - TablePath.of(metricName.split("#")[1]).getTableName(); + TablePath.of(metricName.split("#")[1]).getFullName(); if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { tableSourceReceivedCountMap.put( tableName, jobMetricsStr.get(metricName)); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 2b9abed83a2..62612d0617a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.engine.server.task; -import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.common.metrics.Counter; import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher; import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler; @@ -35,10 +35,17 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES; @@ -225,4 +232,12 @@ public void sendRecordToNext(Record record) throws IOException { } } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index dfbe25beea8..dbcde3e9d6e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; @@ -87,12 +88,7 @@ public void init() throws Exception { tablePaths = producedCatalogTables.stream() .map(CatalogTable::getTableId) - .map( - tableIdentifier -> - TablePath.of( - tableIdentifier.getDatabaseName(), - tableIdentifier.getSchemaName(), - tableIdentifier.getTableName())) + .map(TableIdentifier::toTablePath) .collect(Collectors.toList()); } catch (UnsupportedOperationException e) { // TODO remove it when all connector use `getProducedCatalogTables` diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index b17393d3510..516e1c97c41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -17,8 +17,6 @@ package org.apache.seatunnel.engine.server.task.flow; -import com.hazelcast.cluster.Address; -import lombok.extern.slf4j.Slf4j; import org.apache.seatunnel.api.common.metrics.Counter; import org.apache.seatunnel.api.common.metrics.Meter; import org.apache.seatunnel.api.common.metrics.MetricsContext; @@ -28,6 +26,8 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -45,15 +45,22 @@ import org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation; import org.apache.seatunnel.engine.server.task.record.Barrier; +import org.apache.commons.lang3.StringUtils; + +import com.hazelcast.cluster.Address; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -338,4 +345,12 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } } From ba73d2724446f464f69ec2edec7e0bd534d5952a Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 25 Jul 2024 10:36:32 +0800 Subject: [PATCH 9/9] revert ci --- .github/workflows/backend.yml | 78 ++++------------------------------- 1 file changed, 8 insertions(+), 70 deletions(-) diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 1e9c02f45a6..88a2d59e3f1 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -313,7 +313,7 @@ jobs: - name: run updated modules integration test (part-1) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -344,7 +344,7 @@ jobs: - name: run updated modules integration test (part-2) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 1` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -375,7 +375,7 @@ jobs: - name: run updated modules integration test (part-3) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 2` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -406,7 +406,7 @@ jobs: - name: run updated modules integration test (part-4) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 3` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -436,7 +436,7 @@ jobs: - name: run updated modules integration test (part-5) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 4` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -466,7 +466,7 @@ jobs: - name: run updated modules integration test (part-6) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 5` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -496,7 +496,7 @@ jobs: - name: run updated modules integration test (part-7) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 6` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -527,69 +527,7 @@ jobs: - name: run updated modules integration test (part-8) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 7` - if [ ! -z $sub_modules ]; then - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci - else - echo "sub modules is empty, skipping" - fi - env: - MAVEN_OPTS: -Xmx2048m - - updated-modules-integration-test-part-9: - needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - runs-on: ${{ matrix.os }} - strategy: - matrix: - java: [ '8', '11' ] - os: [ 'ubuntu-latest' ] - timeout-minutes: 90 - steps: - - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 - with: - java-version: ${{ matrix.java }} - distribution: 'temurin' - cache: 'maven' - - name: free disk space - run: tools/github/free_disk_space.sh - - name: run updated modules integration test (part-8) - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 8` - if [ ! -z $sub_modules ]; then - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci - else - echo "sub modules is empty, skipping" - fi - env: - MAVEN_OPTS: -Xmx2048m - - updated-modules-integration-test-part-10: - needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - runs-on: ${{ matrix.os }} - strategy: - matrix: - java: [ '8', '11' ] - os: [ 'ubuntu-latest' ] - timeout-minutes: 90 - steps: - - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 - with: - java-version: ${{ matrix.java }} - distribution: 'temurin' - cache: 'maven' - - name: free disk space - run: tools/github/free_disk_space.sh - - name: run updated modules integration test (part-8) - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 9` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else