diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml index 1e9c02f45a63..88a2d59e3f19 100644 --- a/.github/workflows/backend.yml +++ b/.github/workflows/backend.yml @@ -313,7 +313,7 @@ jobs: - name: run updated modules integration test (part-1) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -344,7 +344,7 @@ jobs: - name: run updated modules integration test (part-2) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 1` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 1` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -375,7 +375,7 @@ jobs: - name: run updated modules integration test (part-3) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 2` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 2` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -406,7 +406,7 @@ jobs: - name: run updated modules integration test (part-4) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 3` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 3` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -436,7 +436,7 @@ jobs: - name: run updated modules integration test (part-5) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 4` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 4` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -466,7 +466,7 @@ jobs: - name: run updated modules integration test (part-6) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 5` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 5` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -496,7 +496,7 @@ jobs: - name: run updated modules integration test (part-7) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 6` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 6` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else @@ -527,69 +527,7 @@ jobs: - name: run updated modules integration test (part-8) if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 7` - if [ ! -z $sub_modules ]; then - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci - else - echo "sub modules is empty, skipping" - fi - env: - MAVEN_OPTS: -Xmx2048m - - updated-modules-integration-test-part-9: - needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - runs-on: ${{ matrix.os }} - strategy: - matrix: - java: [ '8', '11' ] - os: [ 'ubuntu-latest' ] - timeout-minutes: 90 - steps: - - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 - with: - java-version: ${{ matrix.java }} - distribution: 'temurin' - cache: 'maven' - - name: free disk space - run: tools/github/free_disk_space.sh - - name: run updated modules integration test (part-8) - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 8` - if [ ! -z $sub_modules ]; then - ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci - else - echo "sub modules is empty, skipping" - fi - env: - MAVEN_OPTS: -Xmx2048m - - updated-modules-integration-test-part-10: - needs: [ changes, sanity-check ] - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - runs-on: ${{ matrix.os }} - strategy: - matrix: - java: [ '8', '11' ] - os: [ 'ubuntu-latest' ] - timeout-minutes: 90 - steps: - - uses: actions/checkout@v2 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v3 - with: - java-version: ${{ matrix.java }} - distribution: 'temurin' - cache: 'maven' - - name: free disk space - run: tools/github/free_disk_space.sh - - name: run updated modules integration test (part-8) - if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' - run: | - sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 9` + sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 7` if [ ! -z $sub_modules ]; then ./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl $sub_modules -am -Pci else diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java index cd51c680928f..160324da7001 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -103,11 +102,9 @@ public SeaTunnelSourceCollector( tablePaths.forEach( tablePath -> sourceReceivedCountPerTable.put( - tablePath.getFullName(), + getFullName(tablePath), metricsContext.counter( - SOURCE_RECEIVED_COUNT - + "#" - + tablePath.getFullName()))); + SOURCE_RECEIVED_COUNT + "#" + getFullName(tablePath)))); } sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT); sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS); @@ -116,6 +113,14 @@ public SeaTunnelSourceCollector( flowControlGate = FlowControlGate.create(flowControlStrategy); } + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } + @Override public void collect(T row) { try { @@ -134,8 +139,7 @@ public void collect(T row) { sourceReceivedBytesPerSeconds.markEvent(size); flowControlGate.audit((SeaTunnelRow) row); if (StringUtils.isNotEmpty(tableId)) { - String tableName = - Optional.of(TablePath.of(tableId).getFullName()).orElse("null"); + String tableName = getFullName(TablePath.of(tableId)); Counter sourceTableCounter = sourceReceivedCountPerTable.get(tableName); if (Objects.nonNull(sourceTableCounter)) { sourceTableCounter.inc(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java index 358843dd2876..5953140802e0 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java @@ -17,6 +17,10 @@ package org.apache.seatunnel.engine.server.task; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import lombok.Getter; +import lombok.NonNull; import org.apache.seatunnel.api.common.metrics.MetricsContext; import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.source.SourceSplit; @@ -33,13 +37,6 @@ import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle; import org.apache.seatunnel.engine.server.task.record.Barrier; -import org.apache.commons.lang3.StringUtils; - -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; -import lombok.Getter; -import lombok.NonNull; - import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -89,10 +86,6 @@ public void init() throws Exception { tablePaths = producedCatalogTables.stream() .map(CatalogTable::getTableId) - .filter( - tableIdentifier -> - StringUtils.isNotBlank( - tableIdentifier.getFullName())) .map( tableIdentifier -> TablePath.of( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 756327cf0094..03474aede28d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -138,9 +138,9 @@ public SinkFlowLifeCycle( sinkTables.forEach( tablePath -> sinkWriteCountPerTable.put( - tablePath.getTableName(), + getFullName(tablePath), metricsContext.counter( - SINK_WRITE_COUNT + "#" + tablePath.getTableName()))); + SINK_WRITE_COUNT + "#" + getFullName(tablePath)))); } } @@ -275,8 +275,7 @@ public void received(Record record) { sinkWriteBytesPerSeconds.markEvent(size); String tableId = ((SeaTunnelRow) record.getData()).getTableId(); if (StringUtils.isNotBlank(tableId)) { - String tableName = - Optional.of(TablePath.of(tableId).getTableName()).orElse("null"); + String tableName = getFullName(TablePath.of(tableId)); Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); if (Objects.nonNull(sinkTableCounter)) { sinkTableCounter.inc(); @@ -346,4 +345,12 @@ public void restoreState(List actionStateList) throws Except ((SupportResourceShare) this.writer).setMultiTableResourceManager(resourceManager, 0); } } + + private String getFullName(TablePath tablePath) { + if (StringUtils.isBlank(tablePath.getTableName())) { + tablePath = + TablePath.of(tablePath.getDatabaseName(), tablePath.getSchemaName(), "default"); + } + return tablePath.getFullName(); + } }