-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Zeta engine] Added the metrics information of table statistics in multi-table mode #7088
Conversation
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hawk9821 created this PR!
.listJobStatus(true) | ||
.contains("FINISHED"))); | ||
|
||
String jobMetrics = jobClient.getJobMetrics(jobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about directly assert jobMetrics string value instead of use code to check the value? This way more easier to know what's metrics look likes. eg:
Assertions.assertTrue(jobMetrics.contains("
.... the multi table metrics
table1 read value = 1
table 2 read value = 2
"))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
try { | ||
JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics); | ||
StreamSupport.stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add test case to assert the metrics value use rest api in
Line 447 in c645d92
.body("[" + i.get() + "].metrics.SourceReceivedCount", equalTo("100")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MultiTableMetricsIT test case has been added
tableNames.forEach( | ||
tableName -> | ||
sourceReceivedCountPerTable.put( | ||
TablePath.of(tableName).getTableName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not direct use tableName
? It should contains more information like database name and schema name.
sourceReceivedCountPerTable.put( | ||
tableName, | ||
metricsContext.counter( | ||
SOURCE_RECEIVED_COUNT + "#" + tableName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only put but not invoke inc
? Seem like it will lose 1 row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
tableNames = | ||
producedCatalogTables.stream() | ||
.map(CatalogTable::getTableId) | ||
.map(TableIdentifier::getTableName) | ||
.filter(StringUtils::isNotBlank) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Please use TablePath.
tableNames = | |
producedCatalogTables.stream() | |
.map(CatalogTable::getTableId) | |
.map(TableIdentifier::getTableName) | |
.filter(StringUtils::isNotBlank) | |
.collect(Collectors.toList()); | |
tableNames = | |
producedCatalogTables.stream() | |
.map(CatalogTable::getTableId) | |
.map(TableIdentifier::toTablePath) | |
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
try { | ||
String tableId = ((SeaTunnelRow) record.getData()).getTableId(); | ||
if (StringUtils.isNotBlank(tableId)) { | ||
String tableName = TablePath.of(tableId).getTableName(); | ||
if (StringUtils.isNotBlank(tableName)) { | ||
Counter sinkTableCounter = sinkWriteCountPerTable.get(tableName); | ||
if (Objects.nonNull(sinkTableCounter)) { | ||
sinkTableCounter.inc(); | ||
} else { | ||
sinkWriteCountPerTable.put( | ||
tableName, | ||
metricsContext.counter( | ||
SINK_WRITE_COUNT + "#" + tableName)); | ||
} | ||
} | ||
} | ||
} catch (Exception e) { | ||
log.error("====================== {}", e.getMessage()); | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -33,6 +33,11 @@ | |||
<artifactId>seatunnel-engine-core</artifactId> | |||
<version>${project.version}</version> | |||
</dependency> | |||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I I suggest that we can move multitablesink package to seatunnel-common module instead of referencing connector-common. Because our connector is engine independent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move multitablesink package to seatunnel-common module, will the impact be a little big
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind. we have enough test case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The classes in multitablesink package refer to the seateunel-api module , which when moved causes seatunnel-common and seatunnel-api loop dependencies.
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
Please resolve conflicts. Thanks. |
@@ -313,7 +313,7 @@ jobs: | |||
- name: run updated modules integration test (part-1) | |||
if: needs.changes.outputs.api == 'false' && needs.changes.outputs.it-modules != '' | |||
run: | | |||
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 8 0` | |||
sub_modules=`python tools/update_modules_check/update_modules_check.py sub_update_it_module ${{needs.changes.outputs.it-modules}} 10 0` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why add new update_modules_check
module? I think 8 modules are enough for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CI subtask is interrupted due to timeout.
https://github.com/hawk9821/seatunnel/actions/runs/9689364040/job/26756996209
Bumps [org.xerial.snappy:snappy-java](https://github.com/xerial/snappy-java) from 1.1.8.3 to 1.1.10.4. - [Release notes](https://github.com/xerial/snappy-java/releases) - [Commits](xerial/snappy-java@1.1.8.3...v1.1.10.4) --- updated-dependencies: - dependency-name: org.xerial.snappy:snappy-java dependency-type: direct:production ... Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ache#7181) * [Improve] Improve error message when can not parse datetime value * update
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
…ics in multi-table mode apache#6959
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.