Skip to content

Commit

Permalink
[Feature][Zeta engine] Added the metrics information of table statist…
Browse files Browse the repository at this point in the history
…ics in multi-table mode #6959
  • Loading branch information
hawk9821 committed Jul 23, 2024
1 parent a41958b commit 6081609
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 92 deletions.
78 changes: 8 additions & 70 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ jobs:
- name: run updated modules integration test (part-1)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -344,7 +344,7 @@ jobs:
- name: run updated modules integration test (part-2)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 1`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -375,7 +375,7 @@ jobs:
- name: run updated modules integration test (part-3)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 2`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -406,7 +406,7 @@ jobs:
- name: run updated modules integration test (part-4)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 3`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -436,7 +436,7 @@ jobs:
- name: run updated modules integration test (part-5)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 4`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -466,7 +466,7 @@ jobs:
- name: run updated modules integration test (part-6)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 5`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -496,7 +496,7 @@ jobs:
- name: run updated modules integration test (part-7)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 6`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down Expand Up @@ -527,69 +527,7 @@ jobs:
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 7`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
echo "sub modules is empty, skipping"
fi
env:
MAVEN_OPTS: -Xmx2048m

updated-modules-integration-test-part-9:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 8`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
echo "sub modules is empty, skipping"
fi
env:
MAVEN_OPTS: -Xmx2048m

updated-modules-integration-test-part-10:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 90
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: free disk space
run: tools/github/free_disk_space.sh
- name: run updated modules integration test (part-8)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 9`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7`
if [ ! -z $sub_modules ]; then
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -103,11 +102,9 @@ public SeaTunnelSourceCollector(
tablePaths.forEach(
tablePath ->
sourceReceivedCountPerTable.put(
tablePath.getFullName(),
getFullName(tablePath),
metricsContext.counter(
SOURCE_RECEIVED_COUNT
+ "#"
+ tablePath.getFullName())));
SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath))));
}
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
Expand All @@ -116,6 +113,14 @@ public SeaTunnelSourceCollector(
flowControlGate = FlowControlGate.create(flowControlStrategy);
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}

@Override
public void collect(T row) {
try {
Expand All @@ -134,8 +139,7 @@ public void collect(T row) {
sourceReceivedBytesPerSeconds.markEvent(size);
flowControlGate.audit((SeaTunnelRow) row);
if (StringUtils.isNotEmpty(tableId)) {
String tableName =
Optional.of(TablePath.of(tableId).getFullName()).orElse("null");
String tableName = getFullName(TablePath.of(tableId));
Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName);
if (Objects.nonNull(sourceTableCounter)) {
sourceTableCounter.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.seatunnel.engine.server.task;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.Getter;
import lombok.NonNull;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
Expand All @@ -33,13 +37,6 @@
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.record.Barrier;

import org.apache.commons.lang3.StringUtils;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.Getter;
import lombok.NonNull;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -89,10 +86,6 @@ public void init() throws Exception {
tablePaths =
producedCatalogTables.stream()
.map(CatalogTable::getTableId)
.filter(
tableIdentifier ->
StringUtils.isNotBlank(
tableIdentifier.getFullName()))
.map(
tableIdentifier ->
TablePath.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ public SinkFlowLifeCycle(
sinkTables.forEach(
tablePath ->
sinkWriteCountPerTable.put(
tablePath.getTableName(),
getFullName(tablePath),
metricsContext.counter(
SINK_WRITE_COUNT + "#" + tablePath.getTableName())));
SINK_WRITE_COUNT + "#" + getFullName(tablePath))));
}
}

Expand Down Expand Up @@ -275,8 +275,7 @@ public void received(Record<?> record) {
sinkWriteBytesPerSeconds.markEvent(size);
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName =
Optional.of(TablePath.of(tableId).getTableName()).orElse("null");
String tableName = getFullName(TablePath.of(tableId));
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
Expand Down Expand Up @@ -346,4 +345,12 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0);
}
}

private String getFullName(TablePath tablePath) {
if (StringUtils.isBlank(tablePath.getTableName())) {
tablePath =
TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default");
}
return tablePath.getFullName();
}
}

0 comments on commit 6081609

Please sign in to comment.