Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta engine] Added the metrics information of table statistics in multi-table mode #7088

Closed
wants to merge 72 commits into from

Conversation

hawk9821
Copy link
Contributor

@hawk9821 hawk9821 commented Jul 1, 2024

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hawk9821 created this PR!

.listJobStatus(true)
.contains("FINISHED")));

String jobMetrics = jobClient.getJobMetrics(jobId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about directly assert jobMetrics string value instead of use code to check the value? This way more easier to know what's metrics look likes. eg:

Assertions.assertTrue(jobMetrics.contains("
.... the multi table metrics
table1 read value = 1
table 2 read value = 2

"))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

try {
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
StreamSupport.stream(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add test case to assert the metrics value use rest api in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MultiTableMetricsIT test case has been added

tableNames.forEach(
tableName ->
sourceReceivedCountPerTable.put(
TablePath.of(tableName).getTableName(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not direct use tableName? It should contains more information like database name and schema name.

Comment on lines 142 to 145
sourceReceivedCountPerTable.put(
tableName,
metricsContext.counter(
SOURCE_RECEIVED_COUNT + "#" + tableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only put but not invoke inc? Seem like it will lose 1 row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 89 to 94
tableNames =
producedCatalogTables.stream()
.map(CatalogTable::getTableId)
.map(TableIdentifier::getTableName)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Please use TablePath.

Suggested change
tableNames =
producedCatalogTables.stream()
.map(CatalogTable::getTableId)
.map(TableIdentifier::getTableName)
.filter(StringUtils::isNotBlank)
.collect(Collectors.toList());
tableNames =
producedCatalogTables.stream()
.map(CatalogTable::getTableId)
.map(TableIdentifier::toTablePath)
.collect(Collectors.toList());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines 278 to 297
try {
String tableId = ((SeaTunnelRow) record.getData()).getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName = TablePath.of(tableId).getTableName();
if (StringUtils.isNotBlank(tableName)) {
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName);
if (Objects.nonNull(sinkTableCounter)) {
sinkTableCounter.inc();
} else {
sinkWriteCountPerTable.put(
tableName,
metricsContext.counter(
SINK_WRITE_COUNT + "#" + tableName));
}
}
}
} catch (Exception e) {
log.error("====================== {}", e.getMessage());
throw new RuntimeException(e);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -33,6 +33,11 @@
<artifactId>seatunnel-engine-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I I suggest that we can move multitablesink package to seatunnel-common module instead of referencing connector-common. Because our connector is engine independent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move multitablesink package to seatunnel-common module, will the impact be a little big

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. we have enough test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The classes in multitablesink package refer to the seateunel-api module , which when moved causes seatunnel-common and seatunnel-api loop dependencies.

@Hisoka-X
Copy link
Member

Please resolve conflicts. Thanks.

@@ -313,7 +313,7 @@ jobs:
- name: run updated modules integration test (part-1)
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != ''
run: |
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0`
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add new update_modules_check module? I think 8 modules are enough for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added document dependencies Pull requests that update a dependency file CI&CD core SeaTunnel core module connectors-v2 Zeta Rest API e2e format api labels Jul 13, 2024
tcodehuber and others added 14 commits July 15, 2024 16:59
Bumps [org.xerial.snappy:snappy-java](https://github.com/xerial/snappy-java) from 1.1.8.3 to 1.1.10.4.
- [Release notes](https://github.com/xerial/snappy-java/releases)
- [Commits](xerial/snappy-java@1.1.8.3...v1.1.10.4)

---
updated-dependencies:
- dependency-name: org.xerial.snappy:snappy-java
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ache#7181)

* [Improve] Improve error message when can not parse datetime value

* update
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.