From 43dd0e62c0e5fbc785ab6062cbaa636d9326405f Mon Sep 17 00:00:00 2001 From: zhangdonghao <39961809+hawk9821@users.noreply.github.com> Date: Fri, 26 Jul 2024 10:44:09 +0800 Subject: [PATCH] [Feature][Zeta] Added the metrics information of table statistics in multi-table mode (#7212) --- .../engine/e2e/MultiTableMetricsIT.java | 16 +++++++++++---- .../batch_fake_multi_table_to_console.conf | 2 ++ .../engine/client/SeaTunnelClientTest.java | 9 +++++---- .../batch_fake_multi_table_to_console.conf | 2 ++ .../rest/RestHttpGetCommandProcessor.java | 2 +- .../server/task/SeaTunnelSourceCollector.java | 20 +++++++++++-------- .../server/task/SourceSeaTunnelTask.java | 14 ++----------- .../server/task/flow/SinkFlowLifeCycle.java | 17 +++++++++++----- 8 files changed, 48 insertions(+), 34 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java index 1cab231187f8..59942eb4cc87 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java @@ -97,10 +97,18 @@ public void multiTableMetrics() { .body("jobStatus", equalTo("FINISHED")) .body("metrics.SourceReceivedCount", equalTo("50")) .body("metrics.SinkWriteCount", equalTo("50")) - .body("metrics.TableSourceReceivedCount.fake1", equalTo("20")) - .body("metrics.TableSourceReceivedCount.fake2", equalTo("30")) - .body("metrics.TableSinkWriteCount.fake1", equalTo("20")) - .body("metrics.TableSinkWriteCount.fake2", equalTo("30")); + .body( + "metrics.TableSourceReceivedCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSourceReceivedCount.'fake.public.table2'", + equalTo("30")) + .body( + "metrics.TableSinkWriteCount.'fake.table1'", + equalTo("20")) + .body( + "metrics.TableSinkWriteCount.'fake.public.table2'", + equalTo("30")); }); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf index f38da63ea3f7..c51929a0edb5 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf @@ -29,6 +29,7 @@ source { result_table_name = "fake1" row.num = 20 schema = { + table = "fake.table1" fields { name = "string" age = "int" @@ -40,6 +41,7 @@ source { result_table_name = "fake2" row.num = 30 schema = { + table = "fake.public.table2" fields { name = "string" age = "int" diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index fecff30e7af1..100aa0b3203f 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -587,10 +587,11 @@ public void testGetMultiTableJobMetrics() { String jobMetrics = jobClient.getJobMetrics(jobId); - Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake1")); - Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake2")); - Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake1")); - Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake2")); + Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1")); + Assertions.assertTrue( + jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1")); + Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2")); log.info("jobMetrics : {}", jobMetrics); JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf index 51fc81dae2ca..df7ae51fe6eb 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf +++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_multi_table_to_console.conf @@ -29,6 +29,7 @@ source { result_table_name = "fake1" row.num = 20 schema = { + table = "fake.table1" fields { name = "string" age = "int" @@ -41,6 +42,7 @@ source { result_table_name = "fake2" row.num = 30 schema = { + table = "fake.public.table2" fields { name = "string" age = "int" diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 96fd25eca27a..d5d60b7cbb44 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -382,7 +382,7 @@ private Map getJobMetrics(String jobMetrics) { .forEach( metricName -> { String tableName = - TablePath.of(metricName.split("#")[1]).getTableName(); + TablePath.of(metricName.split("#")[1]).getFullName(); if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) { tableSourceReceivedCountMap.put( tableName, jobMetricsStr.get(metricName)); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index 86b12de98b4d..62612d0617a7 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -35,7 +35,7 @@ import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle; -import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j; @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -103,11 +102,9 @@ public SeaTunnelSourceCollector( tablePaths.forEach( tablePath -> sourceReceivedCountPerTable.put( - tablePath.getTableName(), + getFullName(tablePath), metricsContext.counter( - SOURCE_RECEIVED_COUNT - + "#" - + tablePath.getTableName()))); + SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath)))); } sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); @@ -134,8 +131,7 @@ public void collect(T row) { sourceReceivedBytesPerSeconds.markEvent(size); flowControlGate.audit((SeaTunnelRow) row); if (StringUtils.isNotEmpty(tableId)) { - String tableName = - Optional.of(TablePath.of(tableId).getTableName()).orElse("null"); + String tableName = getFullName(TablePath.of(tableId)); Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName); if (Objects.nonNull(sourceTableCounter)) { sourceTableCounter.inc(); @@ -236,4 +232,12 @@ public void sendRecordToNext(Record record) throws IOException { } } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index d46ed615feb8..dbcde3e9d6e2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy; @@ -33,8 +34,6 @@ import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; import org.apache.seatunnel.engine.server.task.record.Barrier; -import org.apache.commons.lang3.StringUtils; - import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; import lombok.Getter; @@ -89,16 +88,7 @@ public void init() throws Exception { tablePaths = producedCatalogTables.stream() .map(CatalogTable::getTableId) - .filter( - tableIdentifier -> - StringUtils.isNotBlank( - tableIdentifier.getTableName())) - .map( - tableIdentifier -> - TablePath.of( - tableIdentifier.getDatabaseName(), - tableIdentifier.getSchemaName(), - tableIdentifier.getTableName())) + .map(TableIdentifier::toTablePath) .collect(Collectors.toList()); } catch (UnsupportedOperationException e) { // TODO remove it when all connector use `getProducedCatalogTables` diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 756327cf0094..516e1c97c41c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -26,11 +26,11 @@ import org.apache.seatunnel.api.sink.SinkCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.Record; import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink; import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey; @@ -138,9 +138,9 @@ public SinkFlowLifeCycle( sinkTables.forEach( tablePath -> sinkWriteCountPerTable.put( - tablePath.getTableName(), + getFullName(tablePath), metricsContext.counter( - SINK_WRITE_COUNT + "#" + tablePath.getTableName()))); + SINK_WRITE_COUNT + "#" + getFullName(tablePath)))); } } @@ -275,8 +275,7 @@ public void received(Record record) { sinkWriteBytesPerSeconds.markEvent(size); String tableId = ((SeaTunnelRow) record.getData()).getTableId(); if (StringUtils.isNotBlank(tableId)) { - String tableName = - Optional.of(TablePath.of(tableId).getTableName()).orElse("null"); + String tableName = getFullName(TablePath.of(tableId)); Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); if (Objects.nonNull(sinkTableCounter)) { sinkTableCounter.inc(); @@ -346,4 +345,12 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } }