Skip to content

Commit

Permalink
[Feature][Zeta engine] Added the metrics information of table statist…
Browse files Browse the repository at this point in the history
…ics in multi-table mode apache#6959
  • Loading branch information
hawk9821 committed Jun 26, 2024
1 parent a93c9d6 commit a15db44
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -147,6 +148,10 @@ public Optional<Serializer<MultiTableCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new MultiTableSinkAggregatedCommitter(aggCommitters));
}

public List<String> getSinkTables() {
return new ArrayList<>(sinks.keySet());
}

@Override
public Optional<Serializer<MultiTableAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.common.config.Common;
Expand Down Expand Up @@ -48,9 +50,14 @@
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
Expand Down Expand Up @@ -472,6 +479,109 @@ public void testSavePointAndRestoreWithSavePoint() throws Exception {
}
}

@Test
public void testGetMultiTableJobMetrics() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/batch_fake_multi_table_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testGetMultiTableJobMetrics");

SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();

try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(
() -> {
return clientJobProxy.waitForJobComplete();
});
long jobId = clientJobProxy.getJobId();

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
jobClient.getJobDetailStatus(jobId).contains("FINISHED")
&& jobClient
.listJobStatus(true)
.contains("FINISHED")));

String jobMetrics = jobClient.getJobMetrics(jobId);

log.info(jobMetrics);

JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
List<String> metricNameList =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
jobMetricsStr.fieldNames(), 0),
false)
.filter(
metricName ->
metricName.startsWith(SOURCE_RECEIVED_COUNT)
|| metricName.startsWith(SINK_WRITE_COUNT))
.collect(Collectors.toList());

Map<String, Long> totalCount =
metricNameList.stream()
.filter(metrics -> !metrics.contains("#"))
.collect(
Collectors.toMap(
metrics -> metrics,
metrics ->
StreamSupport.stream(
jobMetricsStr
.get(metrics)
.spliterator(),
false)
.mapToLong(
value ->
value.get("value")
.asLong())
.sum()));

Map<String, Long> tableCount =
metricNameList.stream()
.filter(metrics -> metrics.contains("#"))
.collect(
Collectors.toMap(
metrics -> metrics,
metrics ->
StreamSupport.stream(
jobMetricsStr
.get(metrics)
.spliterator(),
false)
.mapToLong(
value ->
value.get("value")
.asLong())
.sum()));

Assertions.assertEquals(
totalCount.get(SOURCE_RECEIVED_COUNT),
tableCount.entrySet().stream()
.filter(e -> e.getKey().startsWith(SOURCE_RECEIVED_COUNT))
.mapToLong(Map.Entry::getValue)
.sum());
Assertions.assertEquals(
totalCount.get(SINK_WRITE_COUNT),
tableCount.entrySet().stream()
.filter(e -> e.getKey().startsWith(SINK_WRITE_COUNT))
.mapToLong(Map.Entry::getValue)
.sum());

} catch (ExecutionException | InterruptedException | JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@AfterAll
public static void after() {
INSTANCE.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake1"
row.num = 20
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}

FakeSource {
result_table_name = "fake2"
row.num = 30
schema = {
fields {
name = "string"
age = "int"
sex = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
console {
source_table_name = "fake1"
}
console {
source_table_name = "fake2"
}
}
5 changes: 5 additions & 0 deletions seatunnel-engine/seatunnel-engine-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
<artifactId>seatunnel-engine-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>checkpoint-storage-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
Expand Down Expand Up @@ -64,7 +65,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
Expand All @@ -78,7 +82,9 @@
public class RestHttpGetCommandProcessor extends HttpCommandProcessor<HttpGetCommand> {

private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
private static final String TABLE_SOURCE_RECEIVED_COUNT = "TableSourceReceivedCount";
private static final String SINK_WRITE_COUNT = "SinkWriteCount";
private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
private final Log4j2HttpGetCommandProcessor original;
private NodeEngine nodeEngine;

Expand Down Expand Up @@ -349,12 +355,31 @@ private void getRunningThread(HttpGetCommand command) {
.collect(JsonArray::new, JsonArray::add, JsonArray::add));
}

private Map<String, Long> getJobMetrics(String jobMetrics) {
Map<String, Long> metricsMap = new HashMap<>();
private Map<String, Object> getJobMetrics(String jobMetrics) {
Map<String, Object> metricsMap = new HashMap<>();
long sourceReadCount = 0L;
long sinkWriteCount = 0L;
Map<String, JsonNode> tableSourceReceivedCountMap = new HashMap<>();
Map<String, JsonNode> tableSinkWriteCountMap = new HashMap<>();
try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0),
false)
.filter(metricName -> metricName.contains("#"))
.forEach(
metricName -> {
String tableName =
TablePath.of(metricName.split("#")[1]).getTableName();
if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
tableSourceReceivedCountMap.put(
tableName, jobMetricsStr.get(metricName));
}
if (metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
tableSinkWriteCountMap.put(
tableName, jobMetricsStr.get(metricName));
}
});
JsonNode sourceReceivedCountJson = jobMetricsStr.get(SOURCE_RECEIVED_COUNT);
JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT);
for (int i = 0; i < jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) {
Expand All @@ -366,9 +391,36 @@ private Map<String, Long> getJobMetrics(String jobMetrics) {
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}

Map<String, Long> tableSourceReceivedCount =
tableSourceReceivedCountMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
StreamSupport.stream(
entry.getValue().spliterator(),
false)
.mapToLong(
node -> node.get("value").asLong())
.sum()));
Map<String, Long> tableSinkWriteCount =
tableSinkWriteCountMap.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry ->
StreamSupport.stream(
entry.getValue().spliterator(),
false)
.mapToLong(
node -> node.get("value").asLong())
.sum()));

metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);

metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount);
metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount);
return metricsMap;
}

Expand Down Expand Up @@ -462,11 +514,24 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
.add(
RestConstant.IS_START_WITH_SAVE_POINT,
jobImmutableInformation.isStartWithSavePoint())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
.add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics)));

return jobInfoJson;
}

private JsonObject toJsonObject(Map<String, Object> jobMetrics) {
JsonObject members = new JsonObject();
jobMetrics.forEach(
(key, value) -> {
if (value instanceof Map) {
members.add(key, toJsonObject((Map<String, Object>) value));
} else {
members.add(key, value.toString());
}
});
return members;
}

private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGInfo jobDAGInfo) {
return new JsonObject()
.add(RestConstant.JOB_ID, String.valueOf(jobState.getJobId()))
Expand All @@ -485,6 +550,6 @@ private JsonObject getJobInfoJson(JobState jobState, String jobMetrics, JobDAGIn
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, JsonUtils.toJsonString(jobDAGInfo))
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
.add(RestConstant.METRICS, toJsonObject(getJobMetrics(jobMetrics)));
}
}
Loading

0 comments on commit a15db44

Please sign in to comment.