Skip to content

Commit

Permalink
[Feature][Zeta] Added the metrics information of table statistics in …
Browse files Browse the repository at this point in the history
…multi-table mode (apache#7212)
  • Loading branch information
hawk9821 committed Jul 27, 2024
1 parent b6d88c6 commit 43dd0e6
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,18 @@ public void multiTableMetrics() {
.body("jobStatus", equalTo("FINISHED"))
.body("metrics.SourceReceivedCount", equalTo("50"))
.body("metrics.SinkWriteCount", equalTo("50"))
.body("metrics.TableSourceReceivedCount.fake1", equalTo("20"))
.body("metrics.TableSourceReceivedCount.fake2", equalTo("30"))
.body("metrics.TableSinkWriteCount.fake1", equalTo("20"))
.body("metrics.TableSinkWriteCount.fake2", equalTo("30"));
.body(
"metrics.TableSourceReceivedCount.'fake.table1'",
equalTo("20"))
.body(
"metrics.TableSourceReceivedCount.'fake.public.table2'",
equalTo("30"))
.body(
"metrics.TableSinkWriteCount.'fake.table1'",
equalTo("20"))
.body(
"metrics.TableSinkWriteCount.'fake.public.table2'",
equalTo("30"));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ source {
result_table_name = "fake1"
row.num = 20
schema = {
table = "fake.table1"
fields {
name = "string"
age = "int"
Expand All @@ -40,6 +41,7 @@ source {
result_table_name = "fake2"
row.num = 30
schema = {
table = "fake.public.table2"
fields {
name = "string"
age = "int"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,11 @@ public void testGetMultiTableJobMetrics() {

String jobMetrics = jobClient.getJobMetrics(jobId);

Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake1"));
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake2"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake1"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake2"));
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.table1"));
Assertions.assertTrue(
jobMetrics.contains(SOURCE_RECEIVED_COUNT + "#fake.public.table2"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.table1"));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + "#fake.public.table2"));

log.info("jobMetrics : {}", jobMetrics);
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ source {
result_table_name = "fake1"
row.num = 20
schema = {
table = "fake.table1"
fields {
name = "string"
age = "int"
Expand All @@ -41,6 +42,7 @@ source {
result_table_name = "fake2"
row.num = 30
schema = {
table = "fake.public.table2"
fields {
name = "string"
age = "int"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private Map<String, Object> getJobMetrics(String jobMetrics) {
.forEach(
metricName -> {
String tableName =
TablePath.of(metricName.split("#")[1]).getTableName();
TablePath.of(metricName.split("#")[1]).getFullName();
if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
tableSourceReceivedCountMap.put(
tableName, jobMetricsStr.get(metricName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -103,11 +102,9 @@ public SeaTunnelSourceCollector(
tablePaths.forEach(
tablePath ->
sourceReceivedCountPerTable.put(
tablePath.getTableName(),
getFullName(tablePath),
metricsContext.counter(
SOURCE_RECEIVED_COUNT
+ "#"
+ tablePath.getTableName())));
SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath))));
}
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
Expand All @@ -134,8 +131,7 @@ public void collect(T row) {
sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
if (StringUtils.isNotEmpty(tableId)) {
String tableName =
Optional.of(TablePath.of(tableId).getTableName()).orElse("null");
String tableName = getFullName(TablePath.of(tableId));
Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
if (Objects.nonNull(sourceTableCounter)) {
sourceTableCounter.inc();
Expand Down Expand Up @@ -236,4 +232,12 @@ public void sendRecordToNext(Record<?> record) throws IOException {
}
}
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
Expand All @@ -33,8 +34,6 @@
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;

import org.apache.commons.lang3.StringUtils;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.Getter;
Expand Down Expand Up @@ -89,16 +88,7 @@ public void init() throws Exception {
tablePaths =
producedCatalogTables.stream()
.map(CatalogTable::getTableId)
.filter(
tableIdentifier ->
StringUtils.isNotBlank(
tableIdentifier.getTableName()))
.map(
tableIdentifier ->
TablePath.of(
tableIdentifier.getDatabaseName(),
tableIdentifier.getSchemaName(),
tableIdentifier.getTableName()))
.map(TableIdentifier::toTablePath)
.collect(Collectors.toList());
} catch (UnsupportedOperationException e) {
// TODO remove it when all connector use `getProducedCatalogTables`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSink;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
Expand Down Expand Up @@ -138,9 +138,9 @@ public SinkFlowLifeCycle(
sinkTables.forEach(
tablePath ->
sinkWriteCountPerTable.put(
tablePath.getTableName(),
getFullName(tablePath),
metricsContext.counter(
SINK_WRITE_COUNT + "#" + tablePath.getTableName())));
SINK_WRITE_COUNT + "#" + getFullName(tablePath))));
}
}

Expand Down Expand Up @@ -275,8 +275,7 @@ public void received(Record<?> record) {
sinkWriteBytesPerSeconds.markEvent(size);
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName =
Optional.of(TablePath.of(tableId).getTableName()).orElse("null");
String tableName = getFullName(TablePath.of(tableId));
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
Expand Down Expand Up @@ -346,4 +345,12 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0);
}
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}
}

0 comments on commit 43dd0e6

Please sign in to comment.